-
Notifications
You must be signed in to change notification settings - Fork 0
Release v1.0 #31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Release v1.0 #31
Conversation
Refactored background job dispatching to resolve dependencies from IServiceProvider instead of IServiceScopeFactory, and centralized CloudEventEnvelope parsing and data extraction into a new CloudEventEnvelopeHelper. Updated DaprJobExecutionBridge and JobDispatcher to use the helper for schema context and payload extraction, improving multi-tenant support and code reuse. Simplified unit of work handling and error management in job dispatching.
Refactor job dispatching and envelope handling logic
Reviewer's GuideRefactors background job dispatching and Dapr execution to be fully scope- and schema-aware, ensures idempotent status transitions using explicit unit-of-work boundaries, centralizes CloudEvent envelope handling, and simplifies handler invocation to use an existing scoped service provider. Sequence diagram for Dapr-triggered background job execution with schema and UoW handlingsequenceDiagram
participant Dapr as DaprRuntime
participant Bridge as DaprJobExecutionBridge
participant ScopeFactory as IServiceScopeFactory
participant Scope as IServiceScope
participant SP as IServiceProvider
participant Schema as ICurrentSchema
participant JobStore as IJobStore
participant Dispatcher as JobDispatcher
participant UowMgr as IUnitOfWorkManager
participant Uow1 as IUnitOfWork
participant Uow2 as IUnitOfWork
participant Invoker as IBackgroundJobInvoker
participant EnvHelper as CloudEventEnvelopeHelper
Dapr->>Bridge: ExecuteAsync(jobName, payload, cancellationToken)
Bridge->>ScopeFactory: CreateAsyncScope()
ScopeFactory-->>Bridge: Scope
Bridge->>Scope: ServiceProvider
Scope-->>Bridge: SP
Bridge->>EnvHelper: ExtractDataPayload(eventSerializer, payload, out envelope)
EnvHelper-->>Bridge: dataPayload, envelope
alt envelope has schema
Bridge->>SP: GetRequiredService~ICurrentSchema~()
SP-->>Bridge: Schema
Bridge->>Schema: Set(envelope.Schema)
end
Bridge->>SP: GetRequiredService~IJobStore~()
SP-->>Bridge: JobStore
Bridge->>JobStore: GetByJobNameAsync(jobName, cancellationToken)
JobStore-->>Bridge: jobInfo or null
alt jobInfo is null
Bridge->>Bridge: Log error and return
Bridge-->>Dapr: throw or complete
else jobInfo found
Bridge->>Dispatcher: DispatchAsync(jobInfo.Id, jobInfo.HandlerName, dataPayload, cancellationToken)
activate Dispatcher
Dispatcher->>ScopeFactory: CreateAsyncScope()
ScopeFactory-->>Dispatcher: Scope
Dispatcher->>Scope: ServiceProvider
Scope-->>Dispatcher: SP
Dispatcher->>EnvHelper: ExtractDataPayload(eventSerializer, jobPayload, out envelope)
EnvHelper-->>Dispatcher: argsPayload, envelope
alt envelope has schema
Dispatcher->>SP: GetRequiredService~ICurrentSchema~()
SP-->>Dispatcher: Schema
Dispatcher->>Schema: Set(envelope.Schema)
end
Dispatcher->>SP: GetRequiredService~IUnitOfWorkManager~()
SP-->>Dispatcher: UowMgr
Dispatcher->>SP: GetRequiredService~IJobStore~()
SP-->>Dispatcher: JobStore
Note over Dispatcher,Uow1: First UoW: idempotency and set Running
Dispatcher->>UowMgr: BeginRequiresNew(cancellationToken)
UowMgr-->>Dispatcher: Uow1
Dispatcher->>JobStore: GetAsync(jobId, cancellationToken)
JobStore-->>Dispatcher: jobInfo
Dispatcher->>Dispatcher: IsJobAlreadyProcessedAsync(JobStore, jobId, handlerName, cancellationToken)
alt already processed
Dispatcher->>Uow1: CommitAsync(cancellationToken)
Dispatcher-->>Bridge: return
else not processed
alt handlerName not found
Dispatcher->>Dispatcher: Log warning
Dispatcher->>JobStore: UpdateStatusAsync(jobId, Failed, clock.UtcNow, error, cancellationToken)
Dispatcher->>Uow1: CommitAsync(cancellationToken)
Dispatcher-->>Bridge: return
else handlerName found
Dispatcher->>JobStore: UpdateStatusAsync(jobId, Running, null, null, cancellationToken)
Dispatcher->>Uow1: CommitAsync(cancellationToken)
end
end
Note over Dispatcher,Uow2: Second UoW: handler execution and completion
Dispatcher->>UowMgr: BeginRequiresNew(cancellationToken)
UowMgr-->>Dispatcher: Uow2
Dispatcher->>BackgroundJobOptions: Resolve invoker by handlerName
BackgroundJobOptions-->>Dispatcher: Invoker
Dispatcher->>Invoker: InvokeAsync(SP, eventSerializer, argsPayload, cancellationToken)
Invoker->>SP: GetRequiredService~IBackgroundJobHandler~TArgs~~()
SP-->>Invoker: handler
Invoker->>handler: HandleAsync(args, cancellationToken)
handler-->>Invoker: complete
Dispatcher->>JobStore: UpdateStatusAsync(jobId, Completed, clock.UtcNow, null, cancellationToken)
Dispatcher->>Uow2: CommitAsync(cancellationToken)
deactivate Dispatcher
end
Bridge-->>Dapr: return
par cancellation or failure
alt OperationCanceledException
Dispatcher->>Dispatcher: Log cancellation
Dispatcher->>JobStore: MarkJobStatusAsync(UowMgr, JobStore, jobId, Cancelled, message, cancellationToken)
else other Exception
Dispatcher->>Dispatcher: Log failure
Dispatcher->>JobStore: MarkJobStatusAsync(UowMgr, JobStore, jobId, Failed, errorMessage, cancellationToken)
end
end
Class diagram for updated background job dispatching and envelope handlingclassDiagram
direction LR
class JobDispatcher {
+JobDispatcher(IServiceScopeFactory scopeFactory, BackgroundJobOptions options, IClock clock, IEventSerializer eventSerializer, ILogger~JobDispatcher~ logger)
+Task DispatchAsync(Guid jobId, string handlerName, ReadOnlyMemory~byte~ jobPayload, CancellationToken cancellationToken)
-Task~bool~ IsJobAlreadyProcessedAsync(IJobStore jobStore, Guid jobId, string handlerName, CancellationToken cancellationToken)
-Task MarkJobStatusAsync(IUnitOfWorkManager uowManager, IJobStore jobStore, Guid jobId, BackgroundJobStatus status, string errorMessage, CancellationToken cancellationToken)
-Task InvokeHandlerAsync(IServiceProvider scopedProvider, string handlerName, ReadOnlyMemory~byte~ jobPayload, CancellationToken cancellationToken)
}
class DaprJobExecutionBridge {
+DaprJobExecutionBridge(IServiceScopeFactory scopeFactory, IJobDispatcher jobDispatcher, IEventSerializer eventSerializer, ILogger~DaprJobExecutionBridge~ logger)
+Task ExecuteAsync(string jobName, ReadOnlyMemory~byte~ payload, CancellationToken cancellationToken)
}
class BackgroundJobInvoker~TArgs~ {
+Task InvokeAsync(IServiceProvider serviceProvider, IEventSerializer eventSerializer, ReadOnlyMemory~byte~ payload, CancellationToken cancellationToken)
}
class IBackgroundJobInvoker {
<<interface>>
+Task InvokeAsync(IServiceProvider scopeFactory, IEventSerializer eventSerializer, ReadOnlyMemory~byte~ payload, CancellationToken cancellationToken)
}
class CloudEventEnvelopeHelper {
<<static>>
+CloudEventEnvelope TryParseEnvelope(IEventSerializer eventSerializer, byte[] payload)
+ReadOnlyMemory~byte~ ExtractDataPayload(IEventSerializer eventSerializer, ReadOnlyMemory~byte~ payload, out CloudEventEnvelope envelope)
}
class IBackgroundJobHandler~TArgs~ {
<<interface>>
+Task HandleAsync(TArgs args, CancellationToken cancellationToken)
}
class IJobDispatcher {
<<interface>>
+Task DispatchAsync(Guid jobId, string handlerName, ReadOnlyMemory~byte~ jobPayload, CancellationToken cancellationToken)
}
class IJobStore {
<<interface>>
+Task JobInfo GetAsync(Guid jobId, CancellationToken cancellationToken)
+Task JobInfo GetByJobNameAsync(string jobName, CancellationToken cancellationToken)
+Task UpdateStatusAsync(Guid jobId, BackgroundJobStatus status, DateTime? timestamp, string error, CancellationToken cancellationToken)
}
class IUnitOfWorkManager {
<<interface>>
+Task IUnitOfWork BeginRequiresNew(CancellationToken cancellationToken)
+Task IUnitOfWork BeginAsync(CancellationToken cancellationToken)
}
class IUnitOfWork {
<<interface>>
+Task CommitAsync(CancellationToken cancellationToken)
+Task RollbackAsync(CancellationToken cancellationToken)
}
class IEventSerializer {
<<interface>>
+T Deserialize~T~(ReadOnlySpan~byte~ payload)
+byte[] Serialize(object data)
}
class ICurrentSchema {
<<interface>>
+void Set(string schema)
}
class CloudEventEnvelope {
+string Type
+string Schema
+object Data
}
class BackgroundJobOptions {
+Dictionary~string, IBackgroundJobInvoker~ Invokers
}
class IServiceScopeFactory {
<<interface>>
+IServiceScope CreateScope()
+IAsyncDisposable CreateAsyncScope()
}
class IServiceProvider {
<<interface>>
+T GetRequiredService~T~()
}
class IJobExecutionBridge {
<<interface>>
+Task ExecuteAsync(string jobName, ReadOnlyMemory~byte~ payload, CancellationToken cancellationToken)
}
JobDispatcher ..> BackgroundJobOptions
JobDispatcher ..> IClock
JobDispatcher ..> IEventSerializer
JobDispatcher ..> IServiceScopeFactory
JobDispatcher ..> IUnitOfWorkManager
JobDispatcher ..> IJobStore
JobDispatcher ..> IBackgroundJobInvoker
JobDispatcher ..> CloudEventEnvelopeHelper
DaprJobExecutionBridge ..|> IJobExecutionBridge
DaprJobExecutionBridge ..> IServiceScopeFactory
DaprJobExecutionBridge ..> IJobDispatcher
DaprJobExecutionBridge ..> IJobStore
DaprJobExecutionBridge ..> ICurrentSchema
DaprJobExecutionBridge ..> IEventSerializer
DaprJobExecutionBridge ..> CloudEventEnvelopeHelper
BackgroundJobInvoker~TArgs~ ..|> IBackgroundJobInvoker
BackgroundJobInvoker~TArgs~ ..> IBackgroundJobHandler~TArgs~
BackgroundJobInvoker~TArgs~ ..> IEventSerializer
CloudEventEnvelopeHelper ..> CloudEventEnvelope
CloudEventEnvelopeHelper ..> IEventSerializer
IBackgroundJobInvoker <.. BackgroundJobOptions
IBackgroundJobHandler~TArgs~ <.. BackgroundJobInvoker~TArgs~
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
Caution Review failedThe pull request is closed. Note
|
| Cohort / File(s) | Summary |
|---|---|
Interface Updates framework/src/BBT.Aether.Core/BBT/Aether/BackgroundJob/IBackgroundJobInvoker.cs |
Changed InvokeAsync parameter type from IServiceScopeFactory scopeFactory to IServiceProvider scopeFactory (parameter name unchanged). |
Core Invoker Implementation framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobInvoker.cs |
Updated method signature to match interface; handler now resolved directly from provided IServiceProvider instead of creating a scoped instance. |
Envelope Handling Utility framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/CloudEventEnvelopeHelper.cs |
New internal static helper class with TryParseEnvelope and ExtractDataPayload methods for safe deserialization and data extraction from CloudEventEnvelope payloads. |
Dapr Bridge Refactoring framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs |
Constructor now accepts IServiceScopeFactory; removed direct IJobStore, ICurrentSchema, and IUnitOfWorkManager parameters. Creates async scope per invocation; uses CloudEventEnvelopeHelper to extract payload and handle schema from envelope; simplifies error handling. |
Dispatcher Refactoring framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs |
Constructor simplified (removed IJobStore and IUnitOfWorkManager); introduces per-dispatch async scope. Implements two-phase unit-of-work: Phase 1 (idempotency check and status update to Running), Phase 2 (handler execution and status update to Completed). New MarkJobStatusAsync helper for status persistence. Envelope schema handled via scoped ICurrentSchema. InvokeHandlerAsync now accepts scoped IServiceProvider. |
Minor Formatting framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/InboxProcessor.cs |
Blank line added between UoW creation and handler invocation (no functional change). |
Sequence Diagram
sequenceDiagram
participant DaprBridge as DaprJobExecutionBridge
participant Helper as CloudEventEnvelopeHelper
participant Dispatcher as JobDispatcher
participant JobStore as IJobStore
participant CurrentSchema as ICurrentSchema
participant UoW as IUnitOfWorkManager
participant Invoker as IBackgroundJobInvoker
DaprBridge->>Helper: ExtractDataPayload(payload)
Helper-->>DaprBridge: dataPayload, envelope
DaprBridge->>JobStore: CreateAsyncScope → GetJobByNameAsync()
JobStore-->>DaprBridge: jobInfo
alt Envelope has Schema
DaprBridge->>CurrentSchema: SetCurrentSchemaAsync(envelope.Schema)
end
DaprBridge->>Dispatcher: DispatchAsync(jobInfo, dataPayload)
Dispatcher->>Dispatcher: Phase 1: Create Scope
Dispatcher->>UoW: IsJobAlreadyProcessedAsync()
Dispatcher->>UoW: UpdateStatusToRunning(jobId)
Dispatcher->>UoW: Commit()
Dispatcher->>Dispatcher: Phase 2: Create New Scope
Dispatcher->>Invoker: InvokeHandlerAsync(scopedProvider, handlerName, payload)
Invoker-->>Dispatcher: (success or exception)
alt Success
Dispatcher->>UoW: UpdateStatusToCompleted(jobId)
Dispatcher->>UoW: Commit()
else Failure/Cancellation
Dispatcher->>Dispatcher: MarkJobStatusAsync(status, errorMessage)
end
Estimated code review effort
🎯 4 (Complex) | ⏱️ ~45 minutes
- JobDispatcher: Two-phase unit-of-work flow with new scope creation per phase; removal of direct DI dependencies from constructor; revised error handling and status marking logic requires careful verification of atomicity and consistency.
- DaprJobExecutionBridge: Significant refactoring of scope management and envelope handling; removal of legacy envelope logic and schema setting flow.
- CloudEventEnvelopeHelper: New envelope parsing and deserialization logic with safe fallback; verify re-serialization correctness.
- Scope lifecycle changes: Verify that scoped resolution and DI lifetime expectations are met across all affected components.
Possibly related PRs
- PR
#28: Modifies DaprJobExecutionBridge, JobDispatcher, and scope/UoW handling patterns in background-job execution. - PR
#29: Changes BackgroundJobInvoker, JobDispatcher, and DaprJobExecutionBridge with related constructor and method signature updates for DI scope management.
Suggested reviewers
- middt
🐇 Scopes now dance per dispatch, not per call,
Envelopes unwrapped with helper's careful hall.
Two phases bloom where one did stand,
UoW commits held steady in our hand.
Refactored, resilient—the job flows free! 🌿✨
✨ Finishing touches
- 📝 Generate docstrings
🧪 Generate unit tests (beta)
- Create PR with unit tests
- Post copyable unit tests in a comment
- Commit unit tests in branch
release-v1.0
📜 Recent review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
framework/src/BBT.Aether.Core/BBT/Aether/BackgroundJob/IBackgroundJobInvoker.cs(1 hunks)framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobInvoker.cs(1 hunks)framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs(2 hunks)framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs(6 hunks)framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/CloudEventEnvelopeHelper.cs(1 hunks)framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/InboxProcessor.cs(1 hunks)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.
Comment @coderabbitai help to get the list of available commands and usage tips.
Summary of ChangesHello @yilmaztayfun, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a substantial refactoring of the background job processing and Dapr job execution mechanisms. The primary goal is to enhance the system's robustness, modularity, and transactional integrity by strategically managing service lifetimes and Unit of Work boundaries. These changes centralize common logic for CloudEvent handling and ensure that job status updates are reliably persisted, contributing to a more resilient background processing infrastructure. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- In
IBackgroundJobInvoker.InvokeAsyncand its implementations, the parameter is now anIServiceProviderbut still namedscopeFactory; consider renaming it (and updating XML docs) to avoid confusion about what is being passed in. - In
DaprJobExecutionBridge.ExecuteAsync, the behavior for a missing job has changed from throwing an exception to logging and returning; if callers rely on failure semantics, consider whether this should still surface as an error (e.g., via an exception) or at least be logged at a more prominent level.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `IBackgroundJobInvoker.InvokeAsync` and its implementations, the parameter is now an `IServiceProvider` but still named `scopeFactory`; consider renaming it (and updating XML docs) to avoid confusion about what is being passed in.
- In `DaprJobExecutionBridge.ExecuteAsync`, the behavior for a missing job has changed from throwing an exception to logging and returning; if callers rely on failure semantics, consider whether this should still surface as an error (e.g., via an exception) or at least be logged at a more prominent level.
## Individual Comments
### Comment 1
<location> `framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs:90-94` </location>
<code_context>
- logger.LogInformation("Successfully completed handler '{HandlerName}' for job id '{JobId}'", handlerName, jobId);
+ await handlerUow.CommitAsync(cancellationToken);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
logger.LogWarning("Handler '{HandlerName}' for job id '{JobId}' was cancelled", handlerName, jobId);
- await HandleJobCancellationAsync(jobId, cancellationToken);
- throw;
+ await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Cancelled,
+ "Job was cancelled", cancellationToken);
}
catch (Exception ex)
</code_context>
<issue_to_address>
**issue (bug_risk):** Cancellation/failure paths no longer rethrow, which changes the observable behavior for callers of DispatchAsync.
Previously, both cancellation and generic exceptions were rethrown after updating status, allowing callers to detect cancellations/failures via exceptions. Now both OperationCanceledException and other exceptions are swallowed after marking status, so DispatchAsync will appear to succeed even when the job was cancelled or failed, which can break consumers that rely on exception-based flow control or retries. If this behavior change is intended, consider at least rethrowing OperationCanceledException to preserve cancellation semantics, and evaluate whether other exceptions should also continue to propagate.
</issue_to_address>
### Comment 2
<location> `framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs:43-49` </location>
<code_context>
- logger.LogError("Job with name '{JobName}' not found in store", jobName);
- throw new InvalidOperationException($"Job with name '{jobName}' not found in store.");
- }
+ var jobStore = scope.ServiceProvider.GetRequiredService<IJobStore>();
+ var jobInfo = await jobStore.GetByJobNameAsync(jobName, cancellationToken);
- // Dispatch to handler using the handler name from job entity
- await jobDispatcher.DispatchAsync(jobInfo.Id, jobInfo.HandlerName, argsPayload, cancellationToken);
-
- await uow.CommitAsync(cancellationToken);
- }
- catch (Exception)
+ if (jobInfo == null)
{
- await uow.RollbackAsync(cancellationToken);
- throw;
+ logger.LogError("Job with name '{JobName}' not found in store", jobName);
+ return;
}
+
</code_context>
<issue_to_address>
**question (bug_risk):** Changing missing-job handling from throwing to just logging-and-returning may hide problems in upstream callers.
Previously, a missing job caused an InvalidOperationException, clearly signaling failure to Dapr or any upstream caller. Now we only log and return, so ExecuteAsync will look successful even when the job is missing, which may lead to silent failures (no retries, no visibility beyond logs). If the goal is to suppress retries for missing jobs, this behavior is fine; otherwise, consider still throwing (ideally a specific exception) so upstream logic can detect and react to this condition.
</issue_to_address>
### Comment 3
<location> `framework/src/BBT.Aether.Core/BBT/Aether/BackgroundJob/IBackgroundJobInvoker.cs:25` </location>
<code_context>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>A task representing the asynchronous operation</returns>
Task InvokeAsync(
- IServiceScopeFactory scopeFactory,
+ IServiceProvider scopeFactory,
IEventSerializer eventSerializer,
ReadOnlyMemory<byte> payload,
</code_context>
<issue_to_address>
**nitpick:** The parameter name `scopeFactory` no longer matches its type `IServiceProvider`, which can be confusing.
This mismatch makes the parameter easy to misinterpret (e.g., thinking it can create scopes). Please rename it to something like `serviceProvider` and update implementations accordingly so the name reflects the actual type and usage.
</issue_to_address>
### Comment 4
<location> `framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/CloudEventEnvelopeHelper.cs:51` </location>
<code_context>
+ ReadOnlyMemory<byte> payload,
+ out CloudEventEnvelope? envelope)
+ {
+ envelope = TryParseEnvelope(eventSerializer, payload.ToArray());
+
+ if (envelope != null)
</code_context>
<issue_to_address>
**suggestion (performance):** Converting ReadOnlyMemory<byte> to a new byte[] for every call may be unnecessarily expensive.
`ExtractDataPayload` currently calls `payload.ToArray()` and passes that to `TryParseEnvelope`, allocating on every call. Since this helper may be used on hot paths, consider adding a `TryParseEnvelope` overload that takes `ReadOnlySpan<byte>`/`ReadOnlyMemory<byte>` (or updating the serializer to support span-based deserialization) so you can avoid the extra copy.
Suggested implementation:
```csharp
/// <param name="eventSerializer">The event serializer to use.</param>
/// <param name="payload">The raw payload bytes.</param>
/// <param name="envelope">Output: the parsed envelope if successful, null otherwise.</param>
/// <returns>The parsed envelope if successful, null otherwise.</returns>
public static CloudEventEnvelope? TryParseEnvelope(
IEventSerializer eventSerializer,
ReadOnlyMemory<byte> payload)
{
// Fast path: payload is backed by a byte[] with no offset – reuse the array without copying.
if (MemoryMarshal.TryGetArray(payload, out ArraySegment<byte> segment) &&
segment.Array is not null &&
segment.Offset == 0 &&
segment.Count == segment.Array.Length)
{
return TryParseEnvelope(eventSerializer, segment.Array);
}
// Fallback: either we have an offset into the array or a non-array-backed memory;
// copy into a new array so we can reuse the existing byte[]-based implementation.
var buffer = payload.ToArray();
return TryParseEnvelope(eventSerializer, buffer);
}
/// <param name="eventSerializer">The event serializer to use.</param>
/// <param name="payload">The raw payload bytes.</param>
/// <param name="envelope">Output: the parsed envelope if successful, null otherwise.</param>
/// <returns>The data payload bytes (either from envelope.Data or original payload).</returns>
public static ReadOnlyMemory<byte> ExtractDataPayload(
IEventSerializer eventSerializer,
ReadOnlyMemory<byte> payload,
out CloudEventEnvelope? envelope)
```
```csharp
{
envelope = TryParseEnvelope(eventSerializer, payload);
if (envelope != null)
{
var argsBytes = eventSerializer.Serialize(envelope.Data);
return new ReadOnlyMemory<byte>(argsBytes);
}
return payload;
}
}
```
1. Ensure `using System;` and `using System.Runtime.InteropServices;` are present at the top of `CloudEventEnvelopeHelper.cs` so that `ArraySegment<T>` and `MemoryMarshal` are available.
2. The new `TryParseEnvelope(IEventSerializer, ReadOnlyMemory<byte>)` overload assumes an existing `TryParseEnvelope(IEventSerializer, byte[])` method in the same class. If the name or signature differs, adjust the calls inside the new overload accordingly.
3. If the serializer can be updated to support `ReadOnlySpan<byte>` directly, you can further optimize the new overload by calling that span-based API instead of materializing a `byte[]` with `payload.ToArray()` in the fallback path.
</issue_to_address>
### Comment 5
<location> `framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobInvoker.cs:24-25` </location>
<code_context>
// Resolve dependencies from DI
- await using var scope = scopeFactory.CreateAsyncScope();
- var handler = scope.ServiceProvider.GetRequiredService<IBackgroundJobHandler<TArgs>>();
+ // await using var scope = scopeFactory.CreateAsyncScope();
+ var handler = serviceProvider.GetRequiredService<IBackgroundJobHandler<TArgs>>();
var args = eventSerializer.Deserialize<TArgs>(payload.Span);
</code_context>
<issue_to_address>
**suggestion (bug_risk):** The commented-out scope creation and direct use of IServiceProvider could be clarified to avoid lifetime/usage confusion.
This now resolves the handler directly from the passed IServiceProvider while leaving the scope creation commented out, which makes the intended lifetime unclear. Since callers seem to pass a scoped provider, this is likely correct, but to avoid confusion or accidental root-provider use, either remove the commented scope line and rename the parameter to `scopedServiceProvider`, or instead take an IServiceScopeFactory and create a scope here explicitly.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) | ||
| { | ||
| logger.LogWarning("Handler '{HandlerName}' for job id '{JobId}' was cancelled", handlerName, jobId); | ||
| await HandleJobCancellationAsync(jobId, cancellationToken); | ||
| throw; | ||
| await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Cancelled, | ||
| "Job was cancelled", cancellationToken); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): Cancellation/failure paths no longer rethrow, which changes the observable behavior for callers of DispatchAsync.
Previously, both cancellation and generic exceptions were rethrown after updating status, allowing callers to detect cancellations/failures via exceptions. Now both OperationCanceledException and other exceptions are swallowed after marking status, so DispatchAsync will appear to succeed even when the job was cancelled or failed, which can break consumers that rely on exception-based flow control or retries. If this behavior change is intended, consider at least rethrowing OperationCanceledException to preserve cancellation semantics, and evaluate whether other exceptions should also continue to propagate.
| /// <param name="cancellationToken">Cancellation token</param> | ||
| /// <returns>A task representing the asynchronous operation</returns> | ||
| Task InvokeAsync( | ||
| IServiceScopeFactory scopeFactory, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: The parameter name scopeFactory no longer matches its type IServiceProvider, which can be confusing.
This mismatch makes the parameter easy to misinterpret (e.g., thinking it can create scopes). Please rename it to something like serviceProvider and update implementations accordingly so the name reflects the actual type and usage.
| ReadOnlyMemory<byte> payload, | ||
| out CloudEventEnvelope? envelope) | ||
| { | ||
| envelope = TryParseEnvelope(eventSerializer, payload.ToArray()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (performance): Converting ReadOnlyMemory to a new byte[] for every call may be unnecessarily expensive.
ExtractDataPayload currently calls payload.ToArray() and passes that to TryParseEnvelope, allocating on every call. Since this helper may be used on hot paths, consider adding a TryParseEnvelope overload that takes ReadOnlySpan<byte>/ReadOnlyMemory<byte> (or updating the serializer to support span-based deserialization) so you can avoid the extra copy.
Suggested implementation:
/// <param name="eventSerializer">The event serializer to use.</param>
/// <param name="payload">The raw payload bytes.</param>
/// <param name="envelope">Output: the parsed envelope if successful, null otherwise.</param>
/// <returns>The parsed envelope if successful, null otherwise.</returns>
public static CloudEventEnvelope? TryParseEnvelope(
IEventSerializer eventSerializer,
ReadOnlyMemory<byte> payload)
{
// Fast path: payload is backed by a byte[] with no offset – reuse the array without copying.
if (MemoryMarshal.TryGetArray(payload, out ArraySegment<byte> segment) &&
segment.Array is not null &&
segment.Offset == 0 &&
segment.Count == segment.Array.Length)
{
return TryParseEnvelope(eventSerializer, segment.Array);
}
// Fallback: either we have an offset into the array or a non-array-backed memory;
// copy into a new array so we can reuse the existing byte[]-based implementation.
var buffer = payload.ToArray();
return TryParseEnvelope(eventSerializer, buffer);
}
/// <param name="eventSerializer">The event serializer to use.</param>
/// <param name="payload">The raw payload bytes.</param>
/// <param name="envelope">Output: the parsed envelope if successful, null otherwise.</param>
/// <returns>The data payload bytes (either from envelope.Data or original payload).</returns>
public static ReadOnlyMemory<byte> ExtractDataPayload(
IEventSerializer eventSerializer,
ReadOnlyMemory<byte> payload,
out CloudEventEnvelope? envelope) {
envelope = TryParseEnvelope(eventSerializer, payload);
if (envelope != null)
{
var argsBytes = eventSerializer.Serialize(envelope.Data);
return new ReadOnlyMemory<byte>(argsBytes);
}
return payload;
}
}- Ensure
using System;andusing System.Runtime.InteropServices;are present at the top ofCloudEventEnvelopeHelper.csso thatArraySegment<T>andMemoryMarshalare available. - The new
TryParseEnvelope(IEventSerializer, ReadOnlyMemory<byte>)overload assumes an existingTryParseEnvelope(IEventSerializer, byte[])method in the same class. If the name or signature differs, adjust the calls inside the new overload accordingly. - If the serializer can be updated to support
ReadOnlySpan<byte>directly, you can further optimize the new overload by calling that span-based API instead of materializing abyte[]withpayload.ToArray()in the fallback path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant and well-structured refactoring of the background job processing logic. The changes enhance robustness by using separate unit-of-work scopes for status updates, improve multi-tenancy support with schema resolution from CloudEvents, and streamline dependency management by leveraging scoped service providers. The introduction of CloudEventEnvelopeHelper is a good step towards centralizing payload parsing. My review includes a critical point about a change in exception handling that could affect scheduler behavior, a suggestion for a performance improvement in payload processing, and a minor naming clarification in an interface.
| catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) | ||
| { | ||
| logger.LogWarning("Handler '{HandlerName}' for job id '{JobId}' was cancelled", handlerName, jobId); | ||
| await HandleJobCancellationAsync(jobId, cancellationToken); | ||
| throw; | ||
| await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Cancelled, | ||
| "Job was cancelled", cancellationToken); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| logger.LogError(ex, "Handler '{HandlerName}' for job id '{JobId}' failed", handlerName, jobId); | ||
| await HandleJobFailureAsync(jobId, ex, cancellationToken); | ||
| throw; | ||
| var errorMessage = $"{ex.GetType().Name}: {ex.Message}".Truncate(4000); | ||
| await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Failed, | ||
| errorMessage, cancellationToken); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The try-catch block for handler execution now catches and logs exceptions without re-throwing them. This is a significant behavioral change from the previous implementation which re-threw exceptions. By swallowing the exception, the DispatchAsync method will complete successfully from the caller's perspective (e.g., DaprJobExecutionBridge), which may prevent job schedulers like Dapr from performing configured retries on job failure. This could lead to failed jobs being reported as successful to the scheduler. Consider re-throwing the exceptions after marking the job status to ensure failures are propagated to the caller.
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
logger.LogWarning("Handler '{HandlerName}' for job id '{JobId}' was cancelled", handlerName, jobId);
await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Cancelled,
"Job was cancelled", cancellationToken);
throw;
}
catch (Exception ex)
{
logger.LogError(ex, "Handler '{HandlerName}' for job id '{JobId}' failed", handlerName, jobId);
var errorMessage = $"{ex.GetType().Name}: {ex.Message}".Truncate(4000);
await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Failed,
errorMessage, cancellationToken);
throw;
}| /// <returns>A task representing the asynchronous operation</returns> | ||
| Task InvokeAsync( | ||
| IServiceScopeFactory scopeFactory, | ||
| IServiceProvider scopeFactory, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parameter name scopeFactory is misleading for a parameter of type IServiceProvider. It should be renamed to serviceProvider to accurately reflect its purpose. Additionally, the XML documentation for this parameter on line 19 is now incorrect and should be updated to describe an IServiceProvider.
IServiceProvider serviceProvider,| internal static class CloudEventEnvelopeHelper | ||
| { | ||
| /// <summary> | ||
| /// Attempts to parse the payload as a CloudEventEnvelope. | ||
| /// Returns null if the payload is not in envelope format (legacy format). | ||
| /// </summary> | ||
| /// <param name="eventSerializer">The event serializer to use for deserialization.</param> | ||
| /// <param name="payload">The raw payload bytes to parse.</param> | ||
| /// <returns>The parsed envelope or null if parsing fails or payload is not in envelope format.</returns> | ||
| public static CloudEventEnvelope? TryParseEnvelope(IEventSerializer eventSerializer, byte[] payload) | ||
| { | ||
| try | ||
| { | ||
| var envelope = eventSerializer.Deserialize<CloudEventEnvelope>(payload); | ||
|
|
||
| // Validate it's actually an envelope by checking required properties | ||
| if (envelope != null && !string.IsNullOrWhiteSpace(envelope.Type)) | ||
| { | ||
| return envelope; | ||
| } | ||
|
|
||
| return null; | ||
| } | ||
| catch | ||
| { | ||
| return null; | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Extracts the data payload from a CloudEventEnvelope, serializing it back to bytes. | ||
| /// If payload is not in envelope format, returns the original payload. | ||
| /// </summary> | ||
| /// <param name="eventSerializer">The event serializer to use.</param> | ||
| /// <param name="payload">The raw payload bytes.</param> | ||
| /// <param name="envelope">Output: the parsed envelope if successful, null otherwise.</param> | ||
| /// <returns>The data payload bytes (either from envelope.Data or original payload).</returns> | ||
| public static ReadOnlyMemory<byte> ExtractDataPayload( | ||
| IEventSerializer eventSerializer, | ||
| ReadOnlyMemory<byte> payload, | ||
| out CloudEventEnvelope? envelope) | ||
| { | ||
| envelope = TryParseEnvelope(eventSerializer, payload.ToArray()); | ||
|
|
||
| if (envelope != null) | ||
| { | ||
| var argsBytes = eventSerializer.Serialize(envelope.Data); | ||
| return new ReadOnlyMemory<byte>(argsBytes); | ||
| } | ||
|
|
||
| return payload; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation of ExtractDataPayload calls payload.ToArray(), which creates an unnecessary memory allocation and copy for every job. To improve performance, TryParseEnvelope can be updated to accept a ReadOnlySpan<byte>, and ExtractDataPayload can then pass payload.Span to it, avoiding the overhead. I've also updated the doc comments to reflect this change.
internal static class CloudEventEnvelopeHelper
{
/// <summary>
/// Attempts to parse the payload as a CloudEventEnvelope.
/// Returns null if the payload is not in envelope format (legacy format).
/// </summary>
/// <param name="eventSerializer">The event serializer to use for deserialization.</param>
/// <param name="payload">The raw payload span to parse.</param>
/// <returns>The parsed envelope or null if parsing fails or payload is not in envelope format.</returns>
public static CloudEventEnvelope? TryParseEnvelope(IEventSerializer eventSerializer, ReadOnlySpan<byte> payload)
{
try
{
var envelope = eventSerializer.Deserialize<CloudEventEnvelope>(payload);
// Validate it's actually an envelope by checking required properties
if (envelope != null && !string.IsNullOrWhiteSpace(envelope.Type))
{
return envelope;
}
return null;
}
catch
{
return null;
}
}
/// <summary>
/// Extracts the data payload from a CloudEventEnvelope, serializing it back to bytes.
/// If payload is not in envelope format, returns the original payload.
/// </summary>
/// <param name="eventSerializer">The event serializer to use.</param>
/// <param name="payload">The raw payload bytes.</param>
/// <param name="envelope">Output: the parsed envelope if successful, null otherwise.</param>
/// <returns>The data payload bytes (either from envelope.Data or original payload).</returns>
public static ReadOnlyMemory<byte> ExtractDataPayload(
IEventSerializer eventSerializer,
ReadOnlyMemory<byte> payload,
out CloudEventEnvelope? envelope)
{
envelope = TryParseEnvelope(eventSerializer, payload.Span);
if (envelope != null)
{
var argsBytes = eventSerializer.Serialize(envelope.Data);
return new ReadOnlyMemory<byte>(argsBytes);
}
return payload;
}
}
|



Summary by Sourcery
Refine background job dispatching and Dapr bridge behavior to better support multi-schema payloads, scoped dependency resolution, and reliable status updates.
New Features:
Bug Fixes:
Enhancements:
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.