Skip to content

russlank/XpressWork

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

22 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

XpressWork

A lightweight, high-performance, thread-safe background work queue manager and dispatcher for .NET.

License: MIT NuGet Buy Me A Coffee

Features

  • 🚀 High Performance: Lock-free operations with efficient channel-based queuing
  • 🧵 Thread-Safe: All operations are thread-safe with proper scope isolation
  • 🔒 Safe Background Execution: No scoped service leaks across async boundaries
  • 🔄 Context Capture & Rehydration: Automatically captures and restores context (user claims, tenant ID, etc.)
  • ⚙️ Flexible Handler Pipeline: Multiple handlers can process the same work type with ordered execution
  • ⏱️ Rate Control: Built-in debounce and throttle scheduling for controlling work execution timing
  • 📊 Observable: Structured logging at all execution boundaries
  • 🎯 Multi-Targeting: Supports .NET 6.0 through .NET 10.0
  • 📚 Fully Documented: Comprehensive XML documentation for all public APIs

What XpressWork Is

XpressWork provides safe, in-memory background work execution for .NET applications. It ensures:

  • Background work executes in isolated DI scopes
  • Request context (user identity, tenant, etc.) is captured and restored
  • Multiple handlers can process work items in deterministic order
  • Proper cancellation and exception handling
  • Rate-controlled scheduling with debounce and throttle patterns

What XpressWork Is NOT

XpressWork does not provide:

  • Persistent job storage across restarts
  • Distributed processing
  • Retry/backoff policies (use Hangfire/Quartz/MassTransit for these)
  • Exactly-once execution guarantees

Installation

Install via NuGet Package Manager:

dotnet add package XpressWork

Or via Package Manager Console:

Install-Package XpressWork

Quick Start

1. Define Your Context

public record MyAppContext
{
    public string UserId { get; init; }
    public string TenantId { get; init; }
}

2. Implement Your Scope Factory

public class MyAppScopeFactory : BackgroundWorkScopeFactoryBase<MyAppContext>
{
    public MyAppScopeFactory(
        ILogger<BackgroundWorkScopeFactoryBase<MyAppContext>> logger,
        IServiceScopeFactory serviceScopeFactory)
        : base(logger, serviceScopeFactory) { }

    public override MyAppContext CaptureScopeContext(IServiceProvider scopedProvider)
    {
        var httpContext = scopedProvider.GetRequiredService<IHttpContextAccessor>().HttpContext!;
        return new MyAppContext
        {
            UserId = httpContext.User.FindFirst(ClaimTypes.NameIdentifier)?.Value,
            TenantId = httpContext.Request.Headers["X-Tenant-Id"].FirstOrDefault()
        };
    }

    public override void RehydrateBackgroundScope(IServiceProvider scopedProvider, MyAppContext context)
    {
        scopedProvider.GetRequiredService<IBackgroundWorkContextAccessor<MyAppContext>>().Context = context;
    }
}

3. Create Work Arguments and Handler

public record SendEmailArgs(string To, string Subject, string Body);

public class SendEmailHandler : IWorkHandler<SendEmailArgs>
{
    private readonly IEmailService _emailService;
    private readonly IBackgroundWorkContextAccessor<MyAppContext> _contextAccessor;
    
    public int Order => 100;

    public SendEmailHandler(
        IEmailService emailService,
        IBackgroundWorkContextAccessor<MyAppContext> contextAccessor)
    {
        _emailService = emailService;
        _contextAccessor = contextAccessor;
    }

    public async Task DoWork(SendEmailArgs args, CancellationToken cancellationToken)
    {
        // Context is available - rehydrated from the original request
        var userId = _contextAccessor.Context?.UserId;
        await _emailService.SendAsync(args.To, args.Subject, args.Body, userId, cancellationToken);
    }
}

4. Register Services

services.AddXpressWork<MyAppContext, MyAppScopeFactory>(options =>
{
    options.MaxQueueLength = 1000;
    options.DrainOnShutdown = true;
});

services.AddWorkHandler<SendEmailArgs, SendEmailHandler>();

5. Enqueue Work

public class OrderController : ControllerBase
{
    private readonly IBackgroundWorkSubmitter<MyAppContext> _submitter;

    public OrderController(IBackgroundWorkSubmitter<MyAppContext> submitter)
    {
        _submitter = submitter;
    }

    [HttpPost]
    public async Task<IActionResult> CreateOrder(CreateOrderRequest request)
    {
        // Create the order...
        var order = await CreateOrderAsync(request);

        // Enqueue background work - context is automatically captured
        await _submitter.Enqueue(new SendEmailArgs(
            order.CustomerEmail,
            "Order Confirmation",
            $"Your order {order.Id} has been confirmed."));

        return Ok(order);
    }
}

Simple Fire-and-Forget (No Custom Context)

// Use the simple overload when context isn't needed
services.AddXpressWork();
services.AddBackgroundActionWorkHandler();

// Enqueue a delegate
await submitter.Enqueue(new BackgroundActionWorkArguments(
    async (sp, ct) =>
    {
        var service = sp.GetRequiredService<IMyService>();
        await service.DoWorkAsync(ct);
    },
    Name: "ProcessOrder"));

Rate-Controlled Scheduling

XpressWork includes a scheduling layer for rate-controlling when work is submitted to the queue. This is useful for scenarios where you need to coalesce rapid events or limit execution frequency.

Enable Scheduling

services.AddXpressWork<MyAppContext, MyAppScopeFactory>();
services.AddXpressWorkScheduling<MyAppContext>();

Debounce Pattern

Execute work only after a period of inactivity. Useful for auto-save scenarios where you want to save only after the user stops typing.

public class PreferencesService
{
    private readonly IBackgroundWorkScheduler<MyAppContext> _scheduler;

    public PreferencesService(IBackgroundWorkScheduler<MyAppContext> scheduler)
    {
        _scheduler = scheduler;
    }

    public async Task SavePreferencesAsync(Guid userId, UserPreferences prefs)
    {
        // Only save after 500ms of inactivity
        // Multiple rapid calls reset the timer
        await _scheduler.Debounce(
            interval: TimeSpan.FromMilliseconds(500),
            key: userId,  // Unique key per user
            workArgument: new SavePreferencesArgs { UserId = userId, Preferences = prefs });
    }
}

Throttle Pattern

Execute work at most once per interval. Useful for sensor data or metrics where you want regular updates regardless of input frequency.

public class SensorService
{
    private readonly IBackgroundWorkScheduler<MyAppContext> _scheduler;

    public async Task ProcessTemperatureAsync(Guid sensorId, double temperature)
    {
        // Process at most once per second, using the latest reading
        await _scheduler.Throttle(
            interval: TimeSpan.FromSeconds(1),
            key: sensorId,
            workArgument: new ProcessTemperatureArgs { SensorId = sensorId, Temperature = temperature });
    }
}

Run-Once-Then Patterns

Execute immediately on first call, then apply rate limiting for subsequent calls.

// Execute first call immediately, then debounce subsequent calls
await _scheduler.RunOnceThenDebounce(
    interval: TimeSpan.FromMilliseconds(500),
    key: userId,
    workArgument: new SaveArgs { ... });

// Execute first call immediately, then throttle subsequent calls
await _scheduler.RunOnceThenThrottle(
    interval: TimeSpan.FromSeconds(1),
    key: sensorId,
    workArgument: new ProcessArgs { ... });

Cancellation and Flushing

// Cancel pending scheduled work
var (action, parameters) = _scheduler.Cancel<SavePreferencesArgs>(userId);

// Flush all pending work immediately (also called on shutdown)
await _scheduler.FlushAsync();

// Discard all pending work without execution
_scheduler.Clear();

Architecture Overview

Component Lifetime Responsibility
BackgroundWorkQueue<TContext> Singleton Owns channel, scheduling, execution
IBackgroundWorkSubmitter<TContext> Scoped Captures caller scope & enqueues
IBackgroundWorkScheduler<TContext> Singleton Rate-controlled scheduling (debounce/throttle)
IBackgroundWorkScopeFactory<TContext> Singleton Capture + rehydrate logic
IBackgroundWorkContextAccessor<TContext> Scoped Holds rehydrated context
IWorkHandler<TArgs> Scoped Executes work

Conceptual Architecture

┌──────────────────────────────────────────────────────────────┐
│                     Your Application                         │
├──────────────────────────────────────────────────────────────┤
│  IBackgroundWorkScheduler     │  IBackgroundWorkSubmitter    │
│  (rate-controlled scheduling) │  (immediate submission)      │
├──────────────────────────────────────────────────────────────┤
│                  IBackgroundWorkQueue                        │
│                  (execution pipeline)                        │
├──────────────────────────────────────────────────────────────┤
│  IWorkHandler<T>  │  IWorkHandler<T>  │  IWorkHandler<T>     │
│  (ordered execution in isolated scopes)                      │
└──────────────────────────────────────────────────────────────┘

Key Invariants

  1. Background execution never uses the original request/caller scope
  2. No scoped services are captured and reused across async boundaries
  3. Each work item executes in a new AsyncServiceScope
  4. Context is rehydrated before resolving handlers
  5. Handler ordering is deterministic and stable
  6. Scheduler flushes all pending work on graceful shutdown

Configuration Options

services.AddXpressWork<MyContext, MyScopeFactory>(options =>
{
    options.MaxQueueLength = 1000;        // 0 = unbounded
    options.DrainOnShutdown = true;       // Process remaining items on shutdown
    options.HandlerErrorMode = HandlerErrorMode.StopOnFirst;  // or ContinueOnError
    options.EnableDiagnostics = true;     // Enable verbose logging
    options.ShutdownTimeout = TimeSpan.FromSeconds(30);
});

Handler Ordering

Multiple handlers can process the same work arguments type. They execute in order based on their Order property:

public class FirstHandler : IWorkHandler<MyArgs>
{
    public int Order => 10;  // Runs first
    // ...
}

public class SecondHandler : IWorkHandler<MyArgs>
{
    public int Order => 20;  // Runs second
    // ...
}

Handlers with the same Order value are sorted deterministically by type name.

Best Practices

✅ DO

  • Keep work arguments immutable (use record types)
  • Resolve services inside handlers, not in work arguments
  • Use the scoped submitter from within a DI scope
  • Handle cancellation tokens properly in handlers
  • Use debounce for user input that triggers saves/updates
  • Use throttle for high-frequency events (sensors, metrics)

🚫 DON'T

  • Capture scoped services in work arguments
  • Store IServiceProvider or HttpContext in work arguments
  • Resolve the submitter from a singleton
  • Use XpressWork for work that requires persistence or distribution
  • Use very short debounce/throttle intervals (< 10ms)

Thread Safety

All operations are thread-safe:

  • Multiple producers can enqueue concurrently
  • Context does not bleed across work items
  • Queue processing is sequential by default
  • Scheduler operations are thread-safe with concurrent dictionary storage

Examples

See the documentation folder for more examples:

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Author

Russlan Kafri
Company: Digixoil

Support

For issues, questions, or suggestions:

  • Open an issue on GitHub
  • Check existing documentation in the /doc folder
  • Review XML documentation in source code

Made with ❤️ by Digixoil

About

A lightweight, high-performance, thread-safe background work queue manager and dispatcher for .NET.

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages