Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
<Project>

<PropertyGroup>
<Version>1.0.0-alpha.24</Version>
<PackageVersion>1.0.0-alpha.24</PackageVersion>
<Version>1.0.0-alpha.25</Version>
<PackageVersion>1.0.0-alpha.25</PackageVersion>
<Authors>Zapto</Authors>
<RepositoryUrl>https://github.com/zapto-dev/Mediator</RepositoryUrl>
<Copyright>Copyright © 2025 Zapto</Copyright>
Expand Down
29 changes: 21 additions & 8 deletions src/Mediator.DependencyInjection/DefaultBackgroundPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,49 +18,62 @@ public DefaultBackgroundPublisher(IServiceScopeFactory scopeFactory)
/// <inheritdoc />
public void Publish(object notification)
{
_ = Task.Run(async () =>
ObserveTask(Task.Run(async () =>
{
using var scope = _scopeFactory.CreateScope();
var mediator = scope.ServiceProvider.GetRequiredService<IPublisher>();

await mediator.Publish(notification, CancellationToken.None);
}, CancellationToken.None);
}, CancellationToken.None));
}

/// <inheritdoc />
public void Publish(MediatorNamespace ns, object notification)
{
_ = Task.Run(async () =>
ObserveTask(Task.Run(async () =>
{
using var scope = _scopeFactory.CreateScope();
var mediator = scope.ServiceProvider.GetRequiredService<IPublisher>();

await mediator.Publish(ns, notification, CancellationToken.None);
}, CancellationToken.None);
}, CancellationToken.None));
}

/// <inheritdoc />
public void Publish<TNotification>(TNotification notification) where TNotification : INotification
{
_ = Task.Run(async () =>
ObserveTask(Task.Run(async () =>
{
using var scope = _scopeFactory.CreateScope();
var mediator = scope.ServiceProvider.GetRequiredService<IPublisher>();

await mediator.Publish(notification, CancellationToken.None);
}, CancellationToken.None);
}, CancellationToken.None));
}

/// <inheritdoc />
public void Publish<TNotification>(MediatorNamespace ns, TNotification notification)
where TNotification : INotification
{
_ = Task.Run(async () =>
ObserveTask(Task.Run(async () =>
{
using var scope = _scopeFactory.CreateScope();
var mediator = scope.ServiceProvider.GetRequiredService<IPublisher>();

await mediator.Publish(ns, notification, CancellationToken.None);
}, CancellationToken.None);
}, CancellationToken.None));
}

/// <summary>
/// Observes faulted fire-and-forget tasks so they do not trigger
/// <see cref="TaskScheduler.UnobservedTaskException"/> and crash the process.
/// </summary>
private static void ObserveTask(Task task)
{
task.ContinueWith(
static t => GC.KeepAlive(t.Exception),
CancellationToken.None,
TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ internal interface IHandlerRegistration
{
INotificationCache Owner { get; }

/// <summary>
/// Set to <c>true</c> under <see cref="INotificationCache.Lock"/> when the
/// handler is disposed. Checked (without lock) as a best-effort guard in
/// <see cref="InvokeAsync"/> to skip already-disposed handlers.
/// </summary>
bool IsDisposed { get; set; }

ValueTask InvokeAsync(IServiceProvider provider, object notification, CancellationToken cancellationToken);
}

Expand Down Expand Up @@ -53,7 +60,8 @@ public GenericNotificationCache(IEnumerable<GenericNotificationRegistration> reg
}
}

public List<Type>? HandlerTypes { get; set; }
private List<Type>? _handlerTypes;
public List<Type>? HandlerTypes { get => Volatile.Read(ref _handlerTypes); set => Volatile.Write(ref _handlerTypes, value); }

public List<GenericNotificationRegistration> MatchingRegistrations { get; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public GenericRequestCache(IEnumerable<GenericRequestRegistration> registrations
}
}

public Type? RequestHandlerType { get; set; }
private Type? _requestHandlerType;
public Type? RequestHandlerType { get => Volatile.Read(ref _requestHandlerType); set => Volatile.Write(ref _requestHandlerType, value); }

public List<GenericRequestRegistration> MatchingRegistrations { get; }
}
Expand All @@ -67,7 +68,8 @@ public GenericRequestCache(IEnumerable<GenericRequestRegistration> registrations
}
}

public Type? RequestHandlerType { get; set; }
private Type? _requestHandlerType;
public Type? RequestHandlerType { get => Volatile.Read(ref _requestHandlerType); set => Volatile.Write(ref _requestHandlerType, value); }

public List<GenericRequestRegistration> MatchingRegistrations { get; }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public GenericStreamRequestCache(IEnumerable<GenericStreamRequestRegistration> r
}
}

public Type? RequestHandlerType { get; set; }
private Type? _requestHandlerType;
public Type? RequestHandlerType { get => Volatile.Read(ref _requestHandlerType); set => Volatile.Write(ref _requestHandlerType, value); }

public List<GenericStreamRequestRegistration> MatchingRegistrations { get; }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@ static NotificationAttributeHandler()

if (attribute == null) continue;

var notificationType = method.GetParameters()[0].ParameterType;
var parameters = method.GetParameters();
if (parameters.Length == 0)
{
throw new InvalidOperationException($"Method '{method.Name}' on '{typeof(T).FullName}' is decorated with [NotificationHandler] but has no parameters. The first parameter must be the notification type.");
}

var notificationType = parameters[0].ParameterType;

if (!typeof(INotification).IsAssignableFrom(notificationType))
{
Expand All @@ -45,8 +51,14 @@ public static IDisposable RegisterHandlers(IServiceProvider serviceProvider, obj
var registration = new HandlerRegistration(cache, invoker, handler, middleware);

cache.Lock.Wait();
cache.Registrations.Add(registration);
cache.Lock.Release();
try
{
cache.Registrations.Add(registration);
}
finally
{
cache.Lock.Release();
}

registrations.Add(registration);
}
Expand All @@ -70,8 +82,15 @@ public void Dispose()
var owner = registration.Owner;

owner.Lock.Wait();
owner.Registrations.Remove(registration);
owner.Lock.Release();
try
{
registration.IsDisposed = true;
owner.Registrations.Remove(registration);
}
finally
{
owner.Lock.Release();
}
}
}
}
Expand All @@ -97,7 +116,14 @@ public HandlerRegistration(

public Func<T, IServiceProvider, object, CancellationToken, ValueTask> Invoke { get; }

public bool IsDisposed { get; set; }
private bool _isDisposed;

public bool IsDisposed
{
get => Volatile.Read(ref _isDisposed);
// Written under Owner.Lock, but Volatile.Write ensures visibility for lock-free reads in InvokeAsync
set => Volatile.Write(ref _isDisposed, value);
}

public ValueTask InvokeAsync(IServiceProvider provider, object notification, CancellationToken cancellationToken)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
{
invokeAsync = cb =>
{
_ = Task.Run(() => invoker(cb));
_ = invoker(cb);
return Task.CompletedTask;
};
}
Expand All @@ -89,7 +89,7 @@
}
}

return (IDisposable) typeof(NotificationAttributeHandler<>).MakeGenericType(handler.GetType())

Check warning on line 92 in src/Mediator.DependencyInjection/ServiceProviderMediator.cs

View workflow job for this annotation

GitHub Actions / test

Possible null reference return.

Check warning on line 92 in src/Mediator.DependencyInjection/ServiceProviderMediator.cs

View workflow job for this annotation

GitHub Actions / test

Converting null literal or possible null value to non-nullable type.

Check warning on line 92 in src/Mediator.DependencyInjection/ServiceProviderMediator.cs

View workflow job for this annotation

GitHub Actions / test (net10.0)

Possible null reference return.

Check warning on line 92 in src/Mediator.DependencyInjection/ServiceProviderMediator.cs

View workflow job for this annotation

GitHub Actions / test (net10.0)

Converting null literal or possible null value to non-nullable type.

Check warning on line 92 in src/Mediator.DependencyInjection/ServiceProviderMediator.cs

View workflow job for this annotation

GitHub Actions / test (net9.0)

Possible null reference return.

Check warning on line 92 in src/Mediator.DependencyInjection/ServiceProviderMediator.cs

View workflow job for this annotation

GitHub Actions / test (net9.0)

Converting null literal or possible null value to non-nullable type.

Check warning on line 92 in src/Mediator.DependencyInjection/ServiceProviderMediator.cs

View workflow job for this annotation

GitHub Actions / test (net8.0)

Possible null reference return.

Check warning on line 92 in src/Mediator.DependencyInjection/ServiceProviderMediator.cs

View workflow job for this annotation

GitHub Actions / test (net8.0)

Converting null literal or possible null value to non-nullable type.
.GetMethod(nameof(NotificationAttributeHandler<object>.RegisterHandlers), BindingFlags.Static | BindingFlags.Public)!
.Invoke(null, new[] { _provider, handler, invokeAsync });
}
Expand Down
3 changes: 2 additions & 1 deletion src/Mediator.Hosting/MediatorExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Zapto.Mediator.Options;
using Zapto.Mediator.Services;

Expand All @@ -16,7 +17,7 @@ public static IMediatorBuilder AddHostingBackgroundScheduler(this IMediatorBuild

mediatorBuilder.Services.AddSingleton<BackgroundQueueService>();
mediatorBuilder.Services.AddHostedService<BackgroundQueueHostedService>();
mediatorBuilder.Services.AddSingleton<IBackgroundPublisher, BackgroundPublisher>();
mediatorBuilder.Services.Replace(ServiceDescriptor.Singleton<IBackgroundPublisher, BackgroundPublisher>());

if (configure is not null)
{
Expand Down
19 changes: 13 additions & 6 deletions src/Mediator.Hosting/Services/BackgroundQueueService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography;
using System.Threading;
using System.Threading.Tasks;
using MediatR;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
Expand Down Expand Up @@ -36,18 +34,22 @@ public void QueueBackgroundWorkItem(Func<Task> workItem, object notification)
throw new OperationCanceledException("Cannot schedule work item since the application is stopping");
}

_workItems.Enqueue(workItem);

lock (_workers)
{
_workItems.Enqueue(workItem);

if (_workers.Count < _options.Value.MaxDegreeOfParallelism)
{
var worker = new Worker
{
Notification = notification
};

worker.Task = Task.Factory.StartNew(() => ProcessBackgroundWorkItem(worker), TaskCreationOptions.LongRunning);
worker.Task = Task.Factory.StartNew(
() => ProcessBackgroundWorkItem(worker),
CancellationToken.None,
TaskCreationOptions.None,
TaskScheduler.Default).Unwrap();
_workers.Add(worker);
}
}
Expand Down Expand Up @@ -93,9 +95,13 @@ public async Task WaitForBackgroundTasksAsync(CancellationToken cancellationToke
resultingTask = await Task.WhenAny(task, tcs.Task);
#endif
}
catch (OperationCanceledException)
{
throw;
}
catch
{
// ignore
// worker exceptions are already logged at the source; ignore here
}

#if !NET
Expand Down Expand Up @@ -127,6 +133,7 @@ private async Task ProcessBackgroundWorkItem(Worker worker)
// Process next work item
if (!_workItems.TryDequeue(out var workItem))
{
await Task.Yield();
continue;
}

Expand Down
2 changes: 1 addition & 1 deletion src/Mediator.SourceGenerator/SenderGenerator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace Zapto.Mediator.Generator;
[Generator]
public class SenderGenerator : IIncrementalGenerator
{
private bool _generateAssemblyInfo;
private readonly bool _generateAssemblyInfo;

private static readonly string[] Interfaces =
[
Expand Down
5 changes: 0 additions & 5 deletions src/Mediator/Exceptions/HandlerNotFoundException.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Runtime.Serialization;

namespace Zapto.Mediator;

Expand All @@ -9,10 +8,6 @@ public HandlerNotFoundException()
{
}

protected HandlerNotFoundException(SerializationInfo info, StreamingContext context) : base(info, context)
{
}

public HandlerNotFoundException(string message) : base(message)
{
}
Expand Down
5 changes: 0 additions & 5 deletions src/Mediator/Exceptions/NamespaceHandlerNotFoundException.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Runtime.Serialization;

namespace Zapto.Mediator;

Expand All @@ -9,10 +8,6 @@ public NamespaceHandlerNotFoundException()
{
}

protected NamespaceHandlerNotFoundException(SerializationInfo info, StreamingContext context) : base(info, context)
{
}

public NamespaceHandlerNotFoundException(string message) : base(message)
{
}
Expand Down
Loading
Loading