-
Notifications
You must be signed in to change notification settings - Fork 0
Refactor job dispatching and envelope handling logic #30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Refactored background job dispatching to resolve dependencies from IServiceProvider instead of IServiceScopeFactory, and centralized CloudEventEnvelope parsing and data extraction into a new CloudEventEnvelopeHelper. Updated DaprJobExecutionBridge and JobDispatcher to use the helper for schema context and payload extraction, improving multi-tenant support and code reuse. Simplified unit of work handling and error management in job dispatching.
Reviewer's GuideRefactors background job dispatching to resolve dependencies from scoped IServiceProvider instances, centralizes CloudEventEnvelope parsing into a reusable helper, and simplifies unit-of-work and error handling around job execution for better idempotency and multi-tenant schema support. Updated class diagram for job dispatching and CloudEventEnvelope handlingclassDiagram
direction LR
class IJobDispatcher {
<<interface>>
+Task DispatchAsync(Guid jobId, string handlerName, ReadOnlyMemory~byte~ jobPayload, CancellationToken cancellationToken)
}
class JobDispatcher {
-IServiceScopeFactory scopeFactory
-BackgroundJobOptions options
-IClock clock
-IEventSerializer eventSerializer
-ILogger~JobDispatcher~ logger
+JobDispatcher(IServiceScopeFactory scopeFactory, BackgroundJobOptions options, IClock clock, IEventSerializer eventSerializer, ILogger~JobDispatcher~ logger)
+Task DispatchAsync(Guid jobId, string handlerName, ReadOnlyMemory~byte~ jobPayload, CancellationToken cancellationToken)
-Task~bool~ IsJobAlreadyProcessedAsync(IJobStore jobStore, Guid jobId, string handlerName, CancellationToken cancellationToken)
-Task MarkJobStatusAsync(IUnitOfWorkManager uowManager, IJobStore jobStore, Guid jobId, BackgroundJobStatus status, string errorMessage, CancellationToken cancellationToken)
-Task InvokeHandlerAsync(IServiceProvider scopedProvider, string handlerName, ReadOnlyMemory~byte~ jobPayload, CancellationToken cancellationToken)
}
class BackgroundJobOptions {
+Dictionary~string, IBackgroundJobInvoker~ Invokers
}
class IBackgroundJobInvoker {
<<interface>>
+Task InvokeAsync(IServiceProvider serviceProvider, IEventSerializer eventSerializer, ReadOnlyMemory~byte~ payload, CancellationToken cancellationToken)
}
class BackgroundJobInvoker_TArgs_ {
-BackgroundJobInvoker_TArgs_()
+Task InvokeAsync(IServiceProvider serviceProvider, IEventSerializer eventSerializer, ReadOnlyMemory~byte~ payload, CancellationToken cancellationToken)
}
class IBackgroundJobHandler_TArgs_ {
<<interface>>
+Task HandleAsync(TArgs args, CancellationToken cancellationToken)
}
class DaprJobExecutionBridge {
+DaprJobExecutionBridge(IServiceScopeFactory scopeFactory, IJobDispatcher jobDispatcher, IEventSerializer eventSerializer, ILogger~DaprJobExecutionBridge~ logger)
+Task ExecuteAsync(string jobName, ReadOnlyMemory~byte~ payload, CancellationToken cancellationToken)
}
class CloudEventEnvelopeHelper {
<<static>>
+CloudEventEnvelope? TryParseEnvelope(IEventSerializer eventSerializer, byte[] payload)
+ReadOnlyMemory~byte~ ExtractDataPayload(IEventSerializer eventSerializer, ReadOnlyMemory~byte~ payload, out CloudEventEnvelope? envelope)
}
class CloudEventEnvelope {
+string Type
+string Schema
+object Data
}
class IJobStore {
<<interface>>
+Task~BackgroundJobInfo?~ GetAsync(Guid jobId, CancellationToken cancellationToken)
+Task~BackgroundJobInfo?~ GetByJobNameAsync(string jobName, CancellationToken cancellationToken)
+Task UpdateStatusAsync(Guid jobId, BackgroundJobStatus status, DateTime? lastTryTime, string errorMessage, CancellationToken cancellationToken)
+Task UpdateStatusAsync(Guid jobId, BackgroundJobStatus status, CancellationToken cancellationToken)
}
class BackgroundJobInfo {
+Guid Id
+string HandlerName
+BackgroundJobStatus Status
}
class IUnitOfWorkManager {
<<interface>>
+Task~IUnitOfWork~ BeginRequiresNew(CancellationToken cancellationToken)
}
class IUnitOfWork {
<<interface>>
+Task CommitAsync(CancellationToken cancellationToken)
+Task RollbackAsync(CancellationToken cancellationToken)
}
class ICurrentSchema {
<<interface>>
+void Set(string schema)
}
class IEventSerializer {
<<interface>>
+T Deserialize~T~(ReadOnlySpan~byte~ payload)
+byte[] Serialize(object data)
}
class IClock {
<<interface>>
+DateTime UtcNow
}
class BackgroundJobStatus {
<<enum>>
Pending
Running
Completed
Failed
Cancelled
}
IJobDispatcher <|.. JobDispatcher
IBackgroundJobInvoker <|.. BackgroundJobInvoker_TArgs_
IBackgroundJobHandler_TArgs_ <.. BackgroundJobInvoker_TArgs_ : resolves
JobDispatcher --> BackgroundJobOptions : uses
JobDispatcher --> IUnitOfWorkManager : resolves per scope
JobDispatcher --> IJobStore : resolves per scope
JobDispatcher --> IEventSerializer : uses
JobDispatcher --> IClock : uses
JobDispatcher --> IServiceScopeFactory : creates scope
JobDispatcher --> CloudEventEnvelopeHelper : uses
JobDispatcher --> ICurrentSchema : sets
DaprJobExecutionBridge --> IServiceScopeFactory : creates scope
DaprJobExecutionBridge --> IJobDispatcher : dispatches to
DaprJobExecutionBridge --> IJobStore : resolves per scope
DaprJobExecutionBridge --> ICurrentSchema : sets
DaprJobExecutionBridge --> CloudEventEnvelopeHelper : uses
BackgroundJobInvoker_TArgs_ --> IBackgroundJobHandler_TArgs_ : GetRequiredService
BackgroundJobInvoker_TArgs_ --> IEventSerializer : Deserialize
IJobStore --> BackgroundJobInfo : returns
BackgroundJobInfo --> BackgroundJobStatus
CloudEventEnvelopeHelper --> CloudEventEnvelope
CloudEventEnvelopeHelper --> IEventSerializer
IUnitOfWorkManager --> IUnitOfWork
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
Caution Review failedThe pull request is closed. Note
|
| Cohort / File(s) | Summary |
|---|---|
Interface & Implementation Updates framework/src/BBT.Aether.Core/BBT/Aether/BackgroundJob/IBackgroundJobInvoker.cs, framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobInvoker.cs |
Updated InvokeAsync signature to accept IServiceProvider instead of IServiceScopeFactory; removed scope creation logic and now directly resolves handlers from the provided service provider. |
Scope Management & DI Refactoring framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs, framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs |
Introduced scope-based dependency resolution via IServiceScopeFactory; refactored to resolve IJobStore and ICurrentSchema inside scopes. JobDispatcher now implements multi-UoW pattern: first UoW for idempotency/status transition to Running, second UoW after handler invocation for final status. Removed constructor-injected IJobStore and IUnitOfWorkManager. |
Envelope Processing Utilities framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/CloudEventEnvelopeHelper.cs |
Added new internal helper class with TryParseEnvelope and ExtractDataPayload methods for centralized CloudEventEnvelope parsing and data extraction; removes legacy envelope-handling logic from DaprJobExecutionBridge. |
Minor Updates framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/InboxProcessor.cs |
Whitespace-only change (empty line added before handler invocation). |
Sequence Diagram(s)
sequenceDiagram
participant Dapr as DaprJobExecutionBridge
participant Dispatcher as JobDispatcher
participant Helper as CloudEventEnvelopeHelper
participant Scope as IServiceProvider (Scoped)
participant Store as IJobStore
participant Schema as ICurrentSchema
participant UoW as IUnitOfWorkManager
participant Invoker as IBackgroundJobInvoker
Dapr->>Helper: ExtractDataPayload(payload)
Helper->>Helper: TryParseEnvelope()
Helper-->>Dapr: dataPayload, envelope
Dapr->>Scope: CreateScope()
Dapr->>Scope: GetRequiredService(IJobStore)
Scope-->>Store: instance
alt Has Schema in Envelope
Dapr->>Scope: GetRequiredService(ICurrentSchema)
Scope-->>Schema: instance
Dapr->>Schema: SetSchema(envelope.Schema)
end
Dapr->>Store: GetJobByName(jobName)
Store-->>Dapr: job
Dapr->>Dispatcher: DispatchAsync(jobId, handlerName, dataPayload)
rect rgb(200, 220, 240)
Note over Dispatcher: Unit of Work `#1`: Idempotency & Mark Running
Dispatcher->>Scope: GetRequiredService(IJobStore)
Scope-->>Store: instance
Dispatcher->>Store: IsJobAlreadyProcessed(jobId)
Store-->>Dispatcher: processed?
Dispatcher->>Scope: GetRequiredService(IUnitOfWorkManager)
Scope-->>UoW: instance
Dispatcher->>UoW: MarkJobStatusAsync(jobId, Running)
UoW->>UoW: Begin/Commit
end
rect rgb(220, 240, 200)
Note over Dispatcher: Unit of Work `#2`: Handler Invocation
Dispatcher->>Invoker: InvokeAsync(serviceProvider, eventSerializer, dataPayload)
Invoker->>Scope: GetRequiredService(handler)
Scope-->>Invoker: handler instance
Invoker->>Invoker: Execute handler
Invoker-->>Dispatcher: success
Dispatcher->>UoW: MarkJobStatusAsync(jobId, Completed)
UoW->>UoW: Begin/Commit
end
alt Exception or Cancellation
Dispatcher->>UoW: MarkJobStatusAsync(jobId, Failed/Cancelled)
UoW->>UoW: Begin/Commit
end
Estimated code review effort
🎯 4 (Complex) | ⏱️ ~45 minutes
- Multi-UoW pattern in JobDispatcher: Review the new idempotency-check UoW followed by handler-invocation UoW pattern; verify proper transaction boundaries and state consistency.
- Scope-based dependency resolution: Carefully examine scope creation, lifetime management, and service resolution order in both
DaprJobExecutionBridgeandJobDispatcher. - CloudEventEnvelopeHelper integration: Validate envelope parsing logic, null-handling, and data re-serialization; confirm backward compatibility with non-envelope payloads.
- Constructor signature changes: Verify all call sites of
JobDispatcherandDaprJobExecutionBridgeconstructors are updated. - Schema context setup: Review conditional schema setup logic in
DaprJobExecutionBridgeand ensureICurrentSchemascope isolation.
Possibly related PRs
- Improve event routing and transaction handling in UoW and background … #28: Modifies
JobDispatcherandDaprJobExecutionBridgewith similar scope/DI refactoring and background job dispatch path changes. - Release v1.0 #29: Alters
DaprJobExecutionBridgeandJobDispatcherconstructors and invocation flow in the same background job classes.
Suggested reviewers
- middt
- safakcakir
Poem
🐰 A hop through scopes, a job well done,
Two UoWs dance before the handler's run,
The envelope unfolds, the schema takes hold,
With idempotent checks and statuses bold!
Refactored with care—a dependency affair. ✨
✨ Finishing touches
- 📝 Generate docstrings
🧪 Generate unit tests (beta)
- Create PR with unit tests
- Post copyable unit tests in a comment
- Commit unit tests in branch
hateoas-pagination
📜 Recent review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
framework/src/BBT.Aether.Core/BBT/Aether/BackgroundJob/IBackgroundJobInvoker.cs(1 hunks)framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobInvoker.cs(1 hunks)framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs(2 hunks)framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs(6 hunks)framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/CloudEventEnvelopeHelper.cs(1 hunks)framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/InboxProcessor.cs(1 hunks)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.
Comment @coderabbitai help to get the list of available commands and usage tips.
Summary of ChangesHello @yilmaztayfun, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly refactors the background job dispatching and CloudEvent envelope handling logic within the system. The core change involves shifting dependency resolution from "IServiceScopeFactory" to "IServiceProvider" for job invokers, which streamlines the process. A new "CloudEventEnvelopeHelper" centralizes the extraction of data and schema context from CloudEvent envelopes, enhancing multi-tenancy and code reuse across job execution components. Furthermore, the unit of work management and error handling within the "JobDispatcher" have been simplified and made more robust through the use of distinct transactional scopes for different stages of job processing. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- In JobDispatcher.DispatchAsync the catch blocks for cancellation and general exceptions no longer rethrow, which changes the contract so callers can’t tell a handler failed or was cancelled; please confirm this silent-success behavior is intentional or consider propagating the failure.
- DaprJobExecutionBridge.ExecuteAsync now logs and returns when a job is not found instead of throwing as before, which may change Dapr retry/monitoring behavior; consider whether this should remain an error that surfaces to the caller.
- CloudEventEnvelopeHelper.ExtractDataPayload allocates a new byte[] via payload.ToArray() and then re-serializes envelope.Data; if this runs on a hot path, consider avoiding the extra allocations (e.g., by working directly on ReadOnlyMemory or detecting already-serialized data).
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In JobDispatcher.DispatchAsync the catch blocks for cancellation and general exceptions no longer rethrow, which changes the contract so callers can’t tell a handler failed or was cancelled; please confirm this silent-success behavior is intentional or consider propagating the failure.
- DaprJobExecutionBridge.ExecuteAsync now logs and returns when a job is not found instead of throwing as before, which may change Dapr retry/monitoring behavior; consider whether this should remain an error that surfaces to the caller.
- CloudEventEnvelopeHelper.ExtractDataPayload allocates a new byte[] via payload.ToArray() and then re-serializes envelope.Data; if this runs on a hot path, consider avoiding the extra allocations (e.g., by working directly on ReadOnlyMemory<byte> or detecting already-serialized data).
## Individual Comments
### Comment 1
<location> `framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs:90-94` </location>
<code_context>
- logger.LogInformation("Successfully completed handler '{HandlerName}' for job id '{JobId}'", handlerName, jobId);
+ await handlerUow.CommitAsync(cancellationToken);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
logger.LogWarning("Handler '{HandlerName}' for job id '{JobId}' was cancelled", handlerName, jobId);
- await HandleJobCancellationAsync(jobId, cancellationToken);
- throw;
+ await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Cancelled,
+ "Job was cancelled", cancellationToken);
}
catch (Exception ex)
</code_context>
<issue_to_address>
**issue (bug_risk):** Cancellation and failure are no longer propagated to callers, which may break upstream retry/error flows.
Previously, both `OperationCanceledException` and general `Exception` were rethrown after updating job status; now both catch blocks swallow the exception, so `DispatchAsync` appears to succeed even on cancellation or failure. Callers (e.g., Dapr bridge, schedulers, orchestration) that rely on exceptions to drive retries/DLQ, compensating actions, or failure metrics will no longer see these signals.
If the intent is to make dispatch effectively fire-and-forget once status is updated, this needs to be an explicit contract change at this boundary. Otherwise, please rethrow from both catch blocks after `MarkJobStatusAsync` so callers still observe the failure/cancellation while status is persisted.
</issue_to_address>
### Comment 2
<location> `framework/src/BBT.Aether.Core/BBT/Aether/BackgroundJob/IBackgroundJobInvoker.cs:25` </location>
<code_context>
/// <returns>A task representing the asynchronous operation</returns>
Task InvokeAsync(
- IServiceScopeFactory scopeFactory,
+ IServiceProvider scopeFactory,
IEventSerializer eventSerializer,
ReadOnlyMemory<byte> payload,
</code_context>
<issue_to_address>
**suggestion:** The parameter name `scopeFactory` is misleading now that the type is `IServiceProvider`.
In `BackgroundJobInvoker<TArgs>` this parameter is already named `serviceProvider`. Please align the name here with the implementation to reflect the actual type and avoid confusion for interface consumers during future changes.
Suggested implementation:
```csharp
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>A task representing the asynchronous operation</returns>
Task InvokeAsync(
IServiceProvider serviceProvider,
IEventSerializer eventSerializer,
ReadOnlyMemory<byte> payload,
CancellationToken cancellationToken);
```
1. Update all implementations of `IBackgroundJobInvoker.InvokeAsync` to rename the corresponding parameter from `scopeFactory` to `serviceProvider`.
2. Update all call sites of `InvokeAsync` that are using named arguments (if any) to use `serviceProvider` instead of `scopeFactory`.
</issue_to_address>
### Comment 3
<location> `framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs:43-52` </location>
<code_context>
- logger.LogError("Job with name '{JobName}' not found in store", jobName);
- throw new InvalidOperationException($"Job with name '{jobName}' not found in store.");
- }
+ var jobStore = scope.ServiceProvider.GetRequiredService<IJobStore>();
+ var jobInfo = await jobStore.GetByJobNameAsync(jobName, cancellationToken);
- // Dispatch to handler using the handler name from job entity
- await jobDispatcher.DispatchAsync(jobInfo.Id, jobInfo.HandlerName, argsPayload, cancellationToken);
-
- await uow.CommitAsync(cancellationToken);
- }
- catch (Exception)
+ if (jobInfo == null)
{
- await uow.RollbackAsync(cancellationToken);
- throw;
+ logger.LogError("Job with name '{JobName}' not found in store", jobName);
+ return;
}
</code_context>
<issue_to_address>
**question (bug_risk):** Silently returning when a job is missing changes the contract vs. throwing; this may hide configuration or consistency issues.
Previously, a missing job (`GetByJobNameAsync` returns null) caused an `InvalidOperationException` after logging; now it only logs and returns, so callers of `ExecuteAsync` will see this as success unless they parse logs.
If upstream systems (e.g., Dapr, schedulers, monitoring) depend on failures to surface misconfiguration or race conditions (e.g., job deleted before execution), this behavior change may hide those issues. Please either keep throwing after logging or return an explicit failure signal if the integration supports it, rather than silently returning.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) | ||
| { | ||
| logger.LogWarning("Handler '{HandlerName}' for job id '{JobId}' was cancelled", handlerName, jobId); | ||
| await HandleJobCancellationAsync(jobId, cancellationToken); | ||
| throw; | ||
| await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Cancelled, | ||
| "Job was cancelled", cancellationToken); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): Cancellation and failure are no longer propagated to callers, which may break upstream retry/error flows.
Previously, both OperationCanceledException and general Exception were rethrown after updating job status; now both catch blocks swallow the exception, so DispatchAsync appears to succeed even on cancellation or failure. Callers (e.g., Dapr bridge, schedulers, orchestration) that rely on exceptions to drive retries/DLQ, compensating actions, or failure metrics will no longer see these signals.
If the intent is to make dispatch effectively fire-and-forget once status is updated, this needs to be an explicit contract change at this boundary. Otherwise, please rethrow from both catch blocks after MarkJobStatusAsync so callers still observe the failure/cancellation while status is persisted.
| /// <returns>A task representing the asynchronous operation</returns> | ||
| Task InvokeAsync( | ||
| IServiceScopeFactory scopeFactory, | ||
| IServiceProvider scopeFactory, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: The parameter name scopeFactory is misleading now that the type is IServiceProvider.
In BackgroundJobInvoker<TArgs> this parameter is already named serviceProvider. Please align the name here with the implementation to reflect the actual type and avoid confusion for interface consumers during future changes.
Suggested implementation:
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>A task representing the asynchronous operation</returns>
Task InvokeAsync(
IServiceProvider serviceProvider,
IEventSerializer eventSerializer,
ReadOnlyMemory<byte> payload,
CancellationToken cancellationToken);- Update all implementations of
IBackgroundJobInvoker.InvokeAsyncto rename the corresponding parameter fromscopeFactorytoserviceProvider. - Update all call sites of
InvokeAsyncthat are using named arguments (if any) to useserviceProviderinstead ofscopeFactory.
| var jobStore = scope.ServiceProvider.GetRequiredService<IJobStore>(); | ||
| var jobInfo = await jobStore.GetByJobNameAsync(jobName, cancellationToken); | ||
|
|
||
| // Dispatch to handler using the handler name from job entity | ||
| await jobDispatcher.DispatchAsync(jobInfo.Id, jobInfo.HandlerName, argsPayload, cancellationToken); | ||
|
|
||
| await uow.CommitAsync(cancellationToken); | ||
| } | ||
| catch (Exception) | ||
| if (jobInfo == null) | ||
| { | ||
| await uow.RollbackAsync(cancellationToken); | ||
| throw; | ||
| logger.LogError("Job with name '{JobName}' not found in store", jobName); | ||
| return; | ||
| } | ||
|
|
||
| // Dispatch to handler with extracted data payload |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question (bug_risk): Silently returning when a job is missing changes the contract vs. throwing; this may hide configuration or consistency issues.
Previously, a missing job (GetByJobNameAsync returns null) caused an InvalidOperationException after logging; now it only logs and returns, so callers of ExecuteAsync will see this as success unless they parse logs.
If upstream systems (e.g., Dapr, schedulers, monitoring) depend on failures to surface misconfiguration or race conditions (e.g., job deleted before execution), this behavior change may hide those issues. Please either keep throwing after logging or return an explicit failure signal if the integration supports it, rather than silently returning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request refactors background job dispatching and envelope handling, introducing a centralized CloudEventEnvelopeHelper and adjusting dependency resolution to use IServiceProvider from a scope. The changes improve code reuse and unit of work management. My review focuses on significant behavioral changes in error handling that could impact integration with Dapr, particularly around job failure and retry mechanisms. I've also included suggestions to enhance the performance and robustness of the new helper class.
| if (jobInfo == null) | ||
| { | ||
| await uow.RollbackAsync(cancellationToken); | ||
| throw; | ||
| logger.LogError("Job with name '{JobName}' not found in store", jobName); | ||
| return; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a job is not found in the store, the method now logs an error and returns. This will likely result in a successful HTTP status code (e.g., 200 OK) being returned to Dapr. The previous implementation threw an exception, which correctly signaled a failure to Dapr and allowed its retry policies to engage. Silently returning on this critical error path could lead to jobs being lost without proper processing or retries. I recommend throwing an exception to signal failure to the calling infrastructure.
if (jobInfo == null)
{
logger.LogError("Job with name '{JobName}' not found in store", jobName);
throw new InvalidOperationException($"Job with name '{jobName}' not found in store.");
}| catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) | ||
| { | ||
| logger.LogWarning("Handler '{HandlerName}' for job id '{JobId}' was cancelled", handlerName, jobId); | ||
| await HandleJobCancellationAsync(jobId, cancellationToken); | ||
| throw; | ||
| await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Cancelled, | ||
| "Job was cancelled", cancellationToken); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| logger.LogError(ex, "Handler '{HandlerName}' for job id '{JobId}' failed", handlerName, jobId); | ||
| await HandleJobFailureAsync(jobId, ex, cancellationToken); | ||
| throw; | ||
| var errorMessage = $"{ex.GetType().Name}: {ex.Message}".Truncate(4000); | ||
| await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Failed, | ||
| errorMessage, cancellationToken); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The catch blocks now handle exceptions by updating the job status but no longer re-throw the exception. This prevents the caller (e.g., DaprJobExecutionBridge) from knowing that the job execution failed. As a result, a success status will likely be returned to the job scheduler (like Dapr), which can prevent its built-in retry mechanisms from working correctly. To ensure the calling infrastructure is aware of the failure, the exception should be re-thrown after the job status has been marked as failed or cancelled.
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
logger.LogWarning("Handler '{HandlerName}' for job id '{JobId}' was cancelled", handlerName, jobId);
await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Cancelled,
"Job was cancelled", cancellationToken);
throw;
}
catch (Exception ex)
{
logger.LogError(ex, "Handler '{HandlerName}' for job id '{JobId}' failed", handlerName, jobId);
var errorMessage = $"{ex.GetType().Name}: {ex.Message}".Truncate(4000);
await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Failed,
errorMessage, cancellationToken);
throw;
}| /// <returns>A task representing the asynchronous operation</returns> | ||
| Task InvokeAsync( | ||
| IServiceScopeFactory scopeFactory, | ||
| IServiceProvider scopeFactory, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| public static CloudEventEnvelope? TryParseEnvelope(IEventSerializer eventSerializer, byte[] payload) | ||
| { | ||
| try | ||
| { | ||
| var envelope = eventSerializer.Deserialize<CloudEventEnvelope>(payload); | ||
|
|
||
| // Validate it's actually an envelope by checking required properties | ||
| if (envelope != null && !string.IsNullOrWhiteSpace(envelope.Type)) | ||
| { | ||
| return envelope; | ||
| } | ||
|
|
||
| return null; | ||
| } | ||
| catch | ||
| { | ||
| return null; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TryParseEnvelope method has a couple of areas for improvement:
-
Performance: It accepts a
byte[], which forcesExtractDataPayloadto create a new array fromReadOnlyMemory<byte>usingpayload.ToArray(). This causes an unnecessary memory allocation. You can improve efficiency by changing the parameter type toReadOnlySpan<byte>, asIEventSerializer.Deserializesupports it. You will also need to update the call inExtractDataPayloadto passpayload.Span. -
Error Handling: The
catchblock is empty and catches all exceptions (catch { ... }). This is risky as it can swallow important errors beyond deserialization issues, making debugging difficult. It's better to be more specific and catch only expected exceptions, such asSystem.Text.Json.JsonException.
public static CloudEventEnvelope? TryParseEnvelope(IEventSerializer eventSerializer, ReadOnlySpan<byte> payload)
{
try
{
var envelope = eventSerializer.Deserialize<CloudEventEnvelope>(payload);
// Validate it's actually an envelope by checking required properties
if (envelope != null && !string.IsNullOrWhiteSpace(envelope.Type))
{
return envelope;
}
return null;
}
catch (System.Text.Json.JsonException)
{
// Not a valid JSON or not in envelope format, which is an expected case for a "Try" method.
return null;
}
}
|



Refactored background job dispatching to resolve dependencies from IServiceProvider instead of IServiceScopeFactory, and centralized CloudEventEnvelope parsing and data extraction into a new CloudEventEnvelopeHelper. Updated DaprJobExecutionBridge and JobDispatcher to use the helper for schema context and payload extraction, improving multi-tenant support and code reuse. Simplified unit of work handling and error management in job dispatching.
Summary by Sourcery
Refactor background job dispatching and Dapr execution bridge to use scoped service resolution and centralized CloudEvent envelope handling while simplifying unit-of-work and status management.
Enhancements:
Summary by CodeRabbit
Release Notes
Refactor
✏️ Tip: You can customize this high-level summary in your review settings.