diff --git a/src/Directory.Build.props b/src/Directory.Build.props index 344a778..670df2a 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -1,8 +1,8 @@ - 1.0.0-alpha.24 - 1.0.0-alpha.24 + 1.0.0-alpha.25 + 1.0.0-alpha.25 Zapto https://github.com/zapto-dev/Mediator Copyright © 2025 Zapto diff --git a/src/Mediator.DependencyInjection/DefaultBackgroundPublisher.cs b/src/Mediator.DependencyInjection/DefaultBackgroundPublisher.cs index 55da2df..ae3c4b3 100644 --- a/src/Mediator.DependencyInjection/DefaultBackgroundPublisher.cs +++ b/src/Mediator.DependencyInjection/DefaultBackgroundPublisher.cs @@ -18,49 +18,62 @@ public DefaultBackgroundPublisher(IServiceScopeFactory scopeFactory) /// public void Publish(object notification) { - _ = Task.Run(async () => + ObserveTask(Task.Run(async () => { using var scope = _scopeFactory.CreateScope(); var mediator = scope.ServiceProvider.GetRequiredService(); await mediator.Publish(notification, CancellationToken.None); - }, CancellationToken.None); + }, CancellationToken.None)); } /// public void Publish(MediatorNamespace ns, object notification) { - _ = Task.Run(async () => + ObserveTask(Task.Run(async () => { using var scope = _scopeFactory.CreateScope(); var mediator = scope.ServiceProvider.GetRequiredService(); await mediator.Publish(ns, notification, CancellationToken.None); - }, CancellationToken.None); + }, CancellationToken.None)); } /// public void Publish(TNotification notification) where TNotification : INotification { - _ = Task.Run(async () => + ObserveTask(Task.Run(async () => { using var scope = _scopeFactory.CreateScope(); var mediator = scope.ServiceProvider.GetRequiredService(); await mediator.Publish(notification, CancellationToken.None); - }, CancellationToken.None); + }, CancellationToken.None)); } /// public void Publish(MediatorNamespace ns, TNotification notification) where TNotification : INotification { - _ = Task.Run(async () => + ObserveTask(Task.Run(async () => { using var scope = _scopeFactory.CreateScope(); var mediator = scope.ServiceProvider.GetRequiredService(); await mediator.Publish(ns, notification, CancellationToken.None); - }, CancellationToken.None); + }, CancellationToken.None)); + } + + /// + /// Observes faulted fire-and-forget tasks so they do not trigger + /// and crash the process. + /// + private static void ObserveTask(Task task) + { + task.ContinueWith( + static t => GC.KeepAlive(t.Exception), + CancellationToken.None, + TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously, + TaskScheduler.Default); } } diff --git a/src/Mediator.DependencyInjection/Generic/Handlers/GenericNotificationHandler.cs b/src/Mediator.DependencyInjection/Generic/Handlers/GenericNotificationHandler.cs index 3e39782..957f291 100644 --- a/src/Mediator.DependencyInjection/Generic/Handlers/GenericNotificationHandler.cs +++ b/src/Mediator.DependencyInjection/Generic/Handlers/GenericNotificationHandler.cs @@ -24,6 +24,13 @@ internal interface IHandlerRegistration { INotificationCache Owner { get; } + /// + /// Set to true under when the + /// handler is disposed. Checked (without lock) as a best-effort guard in + /// to skip already-disposed handlers. + /// + bool IsDisposed { get; set; } + ValueTask InvokeAsync(IServiceProvider provider, object notification, CancellationToken cancellationToken); } @@ -53,7 +60,8 @@ public GenericNotificationCache(IEnumerable reg } } - public List? HandlerTypes { get; set; } + private List? _handlerTypes; + public List? HandlerTypes { get => Volatile.Read(ref _handlerTypes); set => Volatile.Write(ref _handlerTypes, value); } public List MatchingRegistrations { get; } diff --git a/src/Mediator.DependencyInjection/Generic/Handlers/GenericRequestHandler.cs b/src/Mediator.DependencyInjection/Generic/Handlers/GenericRequestHandler.cs index 50b8b5b..87ec1f3 100644 --- a/src/Mediator.DependencyInjection/Generic/Handlers/GenericRequestHandler.cs +++ b/src/Mediator.DependencyInjection/Generic/Handlers/GenericRequestHandler.cs @@ -43,7 +43,8 @@ public GenericRequestCache(IEnumerable registrations } } - public Type? RequestHandlerType { get; set; } + private Type? _requestHandlerType; + public Type? RequestHandlerType { get => Volatile.Read(ref _requestHandlerType); set => Volatile.Write(ref _requestHandlerType, value); } public List MatchingRegistrations { get; } } @@ -67,7 +68,8 @@ public GenericRequestCache(IEnumerable registrations } } - public Type? RequestHandlerType { get; set; } + private Type? _requestHandlerType; + public Type? RequestHandlerType { get => Volatile.Read(ref _requestHandlerType); set => Volatile.Write(ref _requestHandlerType, value); } public List MatchingRegistrations { get; } } diff --git a/src/Mediator.DependencyInjection/Generic/Handlers/GenericStreamRequestHandler.cs b/src/Mediator.DependencyInjection/Generic/Handlers/GenericStreamRequestHandler.cs index 6dc786a..de5a138 100644 --- a/src/Mediator.DependencyInjection/Generic/Handlers/GenericStreamRequestHandler.cs +++ b/src/Mediator.DependencyInjection/Generic/Handlers/GenericStreamRequestHandler.cs @@ -41,7 +41,8 @@ public GenericStreamRequestCache(IEnumerable r } } - public Type? RequestHandlerType { get; set; } + private Type? _requestHandlerType; + public Type? RequestHandlerType { get => Volatile.Read(ref _requestHandlerType); set => Volatile.Write(ref _requestHandlerType, value); } public List MatchingRegistrations { get; } } diff --git a/src/Mediator.DependencyInjection/Generic/NotificationAttributeHandler.cs b/src/Mediator.DependencyInjection/Generic/NotificationAttributeHandler.cs index 0ce731b..0030238 100644 --- a/src/Mediator.DependencyInjection/Generic/NotificationAttributeHandler.cs +++ b/src/Mediator.DependencyInjection/Generic/NotificationAttributeHandler.cs @@ -19,7 +19,13 @@ static NotificationAttributeHandler() if (attribute == null) continue; - var notificationType = method.GetParameters()[0].ParameterType; + var parameters = method.GetParameters(); + if (parameters.Length == 0) + { + throw new InvalidOperationException($"Method '{method.Name}' on '{typeof(T).FullName}' is decorated with [NotificationHandler] but has no parameters. The first parameter must be the notification type."); + } + + var notificationType = parameters[0].ParameterType; if (!typeof(INotification).IsAssignableFrom(notificationType)) { @@ -45,8 +51,14 @@ public static IDisposable RegisterHandlers(IServiceProvider serviceProvider, obj var registration = new HandlerRegistration(cache, invoker, handler, middleware); cache.Lock.Wait(); - cache.Registrations.Add(registration); - cache.Lock.Release(); + try + { + cache.Registrations.Add(registration); + } + finally + { + cache.Lock.Release(); + } registrations.Add(registration); } @@ -70,8 +82,15 @@ public void Dispose() var owner = registration.Owner; owner.Lock.Wait(); - owner.Registrations.Remove(registration); - owner.Lock.Release(); + try + { + registration.IsDisposed = true; + owner.Registrations.Remove(registration); + } + finally + { + owner.Lock.Release(); + } } } } @@ -97,7 +116,14 @@ public HandlerRegistration( public Func Invoke { get; } - public bool IsDisposed { get; set; } + private bool _isDisposed; + + public bool IsDisposed + { + get => Volatile.Read(ref _isDisposed); + // Written under Owner.Lock, but Volatile.Write ensures visibility for lock-free reads in InvokeAsync + set => Volatile.Write(ref _isDisposed, value); + } public ValueTask InvokeAsync(IServiceProvider provider, object notification, CancellationToken cancellationToken) { diff --git a/src/Mediator.DependencyInjection/ServiceProviderMediator.cs b/src/Mediator.DependencyInjection/ServiceProviderMediator.cs index 38fe578..724acb4 100644 --- a/src/Mediator.DependencyInjection/ServiceProviderMediator.cs +++ b/src/Mediator.DependencyInjection/ServiceProviderMediator.cs @@ -79,7 +79,7 @@ public IDisposable RegisterNotificationHandler(object handler, Func, { invokeAsync = cb => { - _ = Task.Run(() => invoker(cb)); + _ = invoker(cb); return Task.CompletedTask; }; } diff --git a/src/Mediator.Hosting/MediatorExtensions.cs b/src/Mediator.Hosting/MediatorExtensions.cs index 8e0a38d..df88524 100644 --- a/src/Mediator.Hosting/MediatorExtensions.cs +++ b/src/Mediator.Hosting/MediatorExtensions.cs @@ -1,5 +1,6 @@ using System; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; using Zapto.Mediator.Options; using Zapto.Mediator.Services; @@ -16,7 +17,7 @@ public static IMediatorBuilder AddHostingBackgroundScheduler(this IMediatorBuild mediatorBuilder.Services.AddSingleton(); mediatorBuilder.Services.AddHostedService(); - mediatorBuilder.Services.AddSingleton(); + mediatorBuilder.Services.Replace(ServiceDescriptor.Singleton()); if (configure is not null) { diff --git a/src/Mediator.Hosting/Services/BackgroundQueueService.cs b/src/Mediator.Hosting/Services/BackgroundQueueService.cs index 4d7546b..6f274d8 100644 --- a/src/Mediator.Hosting/Services/BackgroundQueueService.cs +++ b/src/Mediator.Hosting/Services/BackgroundQueueService.cs @@ -2,10 +2,8 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; -using System.Security.Cryptography; using System.Threading; using System.Threading.Tasks; -using MediatR; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -36,10 +34,10 @@ public void QueueBackgroundWorkItem(Func workItem, object notification) throw new OperationCanceledException("Cannot schedule work item since the application is stopping"); } - _workItems.Enqueue(workItem); - lock (_workers) { + _workItems.Enqueue(workItem); + if (_workers.Count < _options.Value.MaxDegreeOfParallelism) { var worker = new Worker @@ -47,7 +45,11 @@ public void QueueBackgroundWorkItem(Func workItem, object notification) Notification = notification }; - worker.Task = Task.Factory.StartNew(() => ProcessBackgroundWorkItem(worker), TaskCreationOptions.LongRunning); + worker.Task = Task.Factory.StartNew( + () => ProcessBackgroundWorkItem(worker), + CancellationToken.None, + TaskCreationOptions.None, + TaskScheduler.Default).Unwrap(); _workers.Add(worker); } } @@ -93,9 +95,13 @@ public async Task WaitForBackgroundTasksAsync(CancellationToken cancellationToke resultingTask = await Task.WhenAny(task, tcs.Task); #endif } + catch (OperationCanceledException) + { + throw; + } catch { - // ignore + // worker exceptions are already logged at the source; ignore here } #if !NET @@ -127,6 +133,7 @@ private async Task ProcessBackgroundWorkItem(Worker worker) // Process next work item if (!_workItems.TryDequeue(out var workItem)) { + await Task.Yield(); continue; } diff --git a/src/Mediator.SourceGenerator/SenderGenerator.cs b/src/Mediator.SourceGenerator/SenderGenerator.cs index 7aa8724..00e633b 100644 --- a/src/Mediator.SourceGenerator/SenderGenerator.cs +++ b/src/Mediator.SourceGenerator/SenderGenerator.cs @@ -14,7 +14,7 @@ namespace Zapto.Mediator.Generator; [Generator] public class SenderGenerator : IIncrementalGenerator { - private bool _generateAssemblyInfo; + private readonly bool _generateAssemblyInfo; private static readonly string[] Interfaces = [ diff --git a/src/Mediator/Exceptions/HandlerNotFoundException.cs b/src/Mediator/Exceptions/HandlerNotFoundException.cs index bdf4099..d58f912 100644 --- a/src/Mediator/Exceptions/HandlerNotFoundException.cs +++ b/src/Mediator/Exceptions/HandlerNotFoundException.cs @@ -1,5 +1,4 @@ using System; -using System.Runtime.Serialization; namespace Zapto.Mediator; @@ -9,10 +8,6 @@ public HandlerNotFoundException() { } - protected HandlerNotFoundException(SerializationInfo info, StreamingContext context) : base(info, context) - { - } - public HandlerNotFoundException(string message) : base(message) { } diff --git a/src/Mediator/Exceptions/NamespaceHandlerNotFoundException.cs b/src/Mediator/Exceptions/NamespaceHandlerNotFoundException.cs index 184a9aa..cdd9eab 100644 --- a/src/Mediator/Exceptions/NamespaceHandlerNotFoundException.cs +++ b/src/Mediator/Exceptions/NamespaceHandlerNotFoundException.cs @@ -1,5 +1,4 @@ using System; -using System.Runtime.Serialization; namespace Zapto.Mediator; @@ -9,10 +8,6 @@ public NamespaceHandlerNotFoundException() { } - protected NamespaceHandlerNotFoundException(SerializationInfo info, StreamingContext context) : base(info, context) - { - } - public NamespaceHandlerNotFoundException(string message) : base(message) { } diff --git a/tests/Mediator.DependencyInjection.Tests/NotificationTest.cs b/tests/Mediator.DependencyInjection.Tests/NotificationTest.cs index 0146144..e369bb1 100644 --- a/tests/Mediator.DependencyInjection.Tests/NotificationTest.cs +++ b/tests/Mediator.DependencyInjection.Tests/NotificationTest.cs @@ -119,21 +119,89 @@ public async Task TestTemporaryHandler(Type type) var handler = (ITestNotificationHandler) Activator.CreateInstance(type)!; var mediator = serviceProvider.GetRequiredService(); + // Before registration – handler must not be called await mediator.Publish((object) new Notification()); - Assert.Equal(0, handler.Count); + // Register and publish – handler is invoked var disposable = mediator.RegisterNotificationHandler(handler); await mediator.Publish((object) new Notification()); - Assert.Equal(1, handler.Count); + // Dispose and publish again – handler must not be called a second time disposable.Dispose(); await mediator.Publish((object) new Notification()); - Assert.Equal(1, handler.Count); } + [Theory] + [InlineData(typeof(ValueTaskNotificationHandler))] + [InlineData(typeof(TaskNotificationHandler))] + [InlineData(typeof(VoidNotificationHandler))] + public async Task TestTemporaryHandlerDisposedBeforeInvocationIsIgnored(Type type) + { + // Verifies the IsDisposed guard: when a handler with a deferred invoker is disposed + // before the callback fires, InvokeAsync should be a no-op. + var tcs = new TaskCompletionSource(); + + var serviceProvider = new ServiceCollection() + .AddMediator(_ => { }) + .BuildServiceProvider(); + + var handler = (ITestNotificationHandler) Activator.CreateInstance(type)!; + var mediator = serviceProvider.GetRequiredService(); + + // Register with a deferred invoker so the IsDisposed path in HandlerRegistration.InvokeAsync is exercised + var disposable = mediator.RegisterNotificationHandler(handler, cb => tcs.Task.ContinueWith(_ => cb()), queue: false); + + // Start publish without awaiting — the middleware blocks on tcs.Task + var publishTask = mediator.Publish((object) new Notification()).AsTask(); + + // Dispose before the deferred callback fires + disposable.Dispose(); + + // Trigger the pending callback now – IsDisposed is true so the handler must not be called + tcs.SetResult(true); + + // Await the publish (now unblocked) and give continuations time to settle + await publishTask; + await Task.Delay(50); + + Assert.Equal(0, handler.Count); + } + + [Fact] + public async Task QueuedHandlerDoesNotBlockPublishEvenWhenNeverCompleting() + { + // Simulates the Blazor shutdown scenario: a handler registered with queue: true + // that never completes must not block Publish or prevent disposal. + var tcs = new TaskCompletionSource(); + + var serviceProvider = new ServiceCollection() + .AddMediator(_ => { }) + .BuildServiceProvider(); + + var handler = new ValueTaskNotificationHandler(); + var mediator = serviceProvider.GetRequiredService(); + + // invoker that returns a never-completing task (simulating Blazor's InvokeAsync with a stuck handler) + var disposable = mediator.RegisterNotificationHandler( + handler, + _ => tcs.Task, // blocks until tcs is completed + queue: true); + + // Publish must return immediately because queue: true wraps the invoker as fire-and-forget + var publishTask = mediator.Publish((object) new Notification()); + var completed = publishTask.IsCompleted || await Task.WhenAny(publishTask.AsTask(), Task.Delay(1000)) == publishTask.AsTask(); + Assert.True(completed, "Publish should return immediately with queue: true, even when the handler never completes"); + + // Disposal must succeed without blocking + disposable.Dispose(); + + // Complete the stuck task so it doesn't prevent the test process from exiting + tcs.SetCanceled(); + } + [Fact] public async Task BackgroundPublisher() { diff --git a/tests/Mediator.Hosting.Tests/BackgroundNotificationQueueTest.cs b/tests/Mediator.Hosting.Tests/BackgroundNotificationQueueTest.cs index ca54b74..2956796 100644 --- a/tests/Mediator.Hosting.Tests/BackgroundNotificationQueueTest.cs +++ b/tests/Mediator.Hosting.Tests/BackgroundNotificationQueueTest.cs @@ -252,4 +252,89 @@ public async Task StopNotification() mediator.Publish(new Notification()); Assert.True(cancelled.Value); } + + [Fact] + public async Task WaitForBackgroundTasksCancelledWhenTokenCancelled() + { + using var cts = new CancellationTokenSource(); + + var tcs = new TaskCompletionSource(); + + var host = Host.CreateEmptyApplicationBuilder(new HostApplicationBuilderSettings()); + + host.Services.AddMediator(b => + { + b.AddNotificationHandler(async (Notification _) => await tcs.Task); + b.AddHostingBackgroundScheduler(); + }); + + var app = host.Build(); + + await app.StartAsync(); + + var backgroundQueue = app.Services.GetRequiredService(); + var mediator = app.Services.GetRequiredService(); + + mediator.Publish(new Notification()); + + // Cancel before the handler finishes + cts.Cancel(); + + await Assert.ThrowsAnyAsync( + () => backgroundQueue.WaitForBackgroundTasksAsync(cts.Token)); + + tcs.SetResult(true); + + await app.StopAsync(); + } + + [Fact] + public async Task StopAsyncCompletesWhenHandlerIsStuck() + { + // A handler that never completes should not prevent the host from stopping. + // StopAsync passes a cancellation token that fires after HostOptions.ShutdownTimeout. + // We configure a short timeout to keep the test fast. + var tcs = new TaskCompletionSource(); + + var host = Host.CreateEmptyApplicationBuilder(new HostApplicationBuilderSettings()); + + host.Services.Configure(o => o.ShutdownTimeout = TimeSpan.FromSeconds(2)); + host.Services.AddMediator(b => + { + b.AddNotificationHandler(async (Notification _) => await tcs.Task); // never completes + b.AddHostingBackgroundScheduler(); + }); + + var app = host.Build(); + await app.StartAsync(); + + var mediator = app.Services.GetRequiredService(); + mediator.Publish(new Notification()); + + // StopAsync should complete within the shutdown timeout, not hang forever + var stopTask = app.StopAsync(); + var finished = await Task.WhenAny(stopTask, Task.Delay(TimeSpan.FromSeconds(10))) == stopTask; + Assert.True(finished, "StopAsync should complete within the shutdown timeout even when a handler is stuck"); + + // Clean up the stuck handler so it doesn't leak + tcs.SetResult(true); + } + + [Fact] + public async Task AddHostingBackgroundSchedulerReplacesDefaultPublisher() + { + var host = Host.CreateEmptyApplicationBuilder(new HostApplicationBuilderSettings()); + + host.Services.AddMediator(b => + { + b.AddHostingBackgroundScheduler(); + }); + + var app = host.Build(); + + // There must be exactly one IBackgroundPublisher registration and it must be BackgroundPublisher, + // not DefaultBackgroundPublisher (which AddMediator registers as a fallback). + var publisher = app.Services.GetRequiredService(); + Assert.Equal("BackgroundPublisher", publisher.GetType().Name); + } }