-
Notifications
You must be signed in to change notification settings - Fork 0
Release v1.0 #29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Release v1.0 #29
Conversation
…jobs Introduces ILocalTransactionEventEnqueuer to enable direct event routing from DbContext to the owning local transaction, ensuring domain events are dispatched to the correct Unit of Work. Refactors background job scheduling to use OnCompleted for post-commit scheduling, adds explicit transaction management in DaprJobExecutionBridge, and removes ambient UoW reliance in JobDispatcher. Updates .gitignore to exclude ai-docs/.
Improve event routing and transaction handling in UoW and background …
|
Caution Review failedThe pull request is closed. Note
|
| Cohort / File(s) | Summary |
|---|---|
Core Event Enqueuing Abstraction framework/src/BBT.Aether.Core/BBT/Aether/Uow/ILocalTransactionEventEnqueuer.cs |
New public interface defining EnqueueEvents method to enable DbContext to route domain events collected during SaveChanges directly into the owning local transaction. |
DbContext & Transaction Infrastructure framework/src/BBT.Aether.Infrastructure/BBT/Aether/Domain/EntityFrameworkCore/AetherDbContext.cs, framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/EntityFrameworkCore/EfCoreTransactionSource.cs, framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/CompositeUnitOfWork.cs |
Adds LocalEventEnqueuer property to AetherDbContext for priority-based event routing. EfCoreTransactionSource establishes bidirectional link between DbContext and local transaction. CompositeUnitOfWork ensures SaveChangesAsync completes before event dispatch logic. |
Background Job Processing framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs, framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs, framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs |
BackgroundJobService refactors UoW usage to defer job scheduling via OnCompleted callback. DaprJobExecutionBridge introduces UoW scope around job execution flow. JobDispatcher removes explicit UoW management from dispatch paths, delegating to external callers. |
Minor Formatting .gitignore, framework/src/BBT.Aether.Aspects/BBT/Aether/Aspects/Uow/UnitOfWorkAttribute.cs |
Adds ai-docs/ ignore rule; adjusts whitespace in UnitOfWorkAttribute. |
Sequence Diagram
sequenceDiagram
actor Caller
participant UoW as Unit of Work
participant DbCtx as DbContext
participant Txn as EfCoreLocalTransaction
participant EventSink as Domain Event Sink
Caller->>UoW: BeginAsync
UoW->>Txn: CreateTransaction
Txn->>DbCtx: Set LocalEventEnqueuer
Caller->>DbCtx: SaveChanges (entity operations)
DbCtx->>DbCtx: Collect domain events
DbCtx->>Txn: EnqueueEvents via LocalEventEnqueuer
Txn->>Txn: Store events in local transaction
Caller->>UoW: SaveChangesAsync
DbCtx->>DbCtx: Publish domain events
rect rgb(180, 220, 180)
Note over Caller,EventSink: Event Dispatch Strategy
alt LocalEventEnqueuer set
DbCtx->>Txn: Retrieve enqueued events
Txn->>EventSink: Dispatch events
else Ambient EventSink
DbCtx->>EventSink: Dispatch events
end
end
Caller->>UoW: CommitAsync
UoW->>Txn: Commit
Txn->>Txn: Clear DbContext link
Caller->>Caller: Continue with committed state
Estimated code review effort
🎯 3 (Moderate) | ⏱️ ~25 minutes
- Key areas requiring attention:
- Transaction semantics and event ordering: ensure events are properly enqueued before SaveChanges completes and that dispatch order respects the new priority logic
- Background job scheduling deferred execution: verify OnCompleted callbacks execute at the correct transaction boundary
- Job dispatcher flow: confirm removal of explicit UoW management doesn't break error handling or status tracking
- DbContext-to-transaction bidirectional link: verify proper cleanup on disposal and no reference leaks
Possibly related PRs
- PR
#16: Implements transaction-aware event storage and dispatch alongside DbContext event orchestration; complements this PR's introduction ofILocalTransactionEventEnqueuerand local transaction routing mechanism.
Suggested labels
enhancement
Suggested reviewers
- tsimsekburgan
- ikarakayali
Poem
🐰 Domain events hop through transactions now,
Enqueued in local scope, all in a row,
No more ambiguous paths—the enqueuer's the way,
Clear routing and callbacks bring order to the fray! ✨
✨ Finishing touches
- 📝 Generate docstrings
🧪 Generate unit tests (beta)
- Create PR with unit tests
- Post copyable unit tests in a comment
- Commit unit tests in branch
release-v1.0
📜 Recent review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
.gitignore(1 hunks)framework/src/BBT.Aether.Aspects/BBT/Aether/Aspects/Uow/UnitOfWorkAttribute.cs(1 hunks)framework/src/BBT.Aether.Core/BBT/Aether/Uow/ILocalTransactionEventEnqueuer.cs(1 hunks)framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs(8 hunks)framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs(5 hunks)framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs(2 hunks)framework/src/BBT.Aether.Infrastructure/BBT/Aether/Domain/EntityFrameworkCore/AetherDbContext.cs(3 hunks)framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/CompositeUnitOfWork.cs(1 hunks)framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/EntityFrameworkCore/EfCoreTransactionSource.cs(3 hunks)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.
Comment @coderabbitai help to get the list of available commands and usage tips.
Reviewer's GuideAligns background job scheduling/execution and EF Core unit-of-work handling so jobs are only scheduled after durable commit, job execution runs inside a unit of work, and domain events are reliably routed to the correct local transaction via a new DbContext-to-local-transaction linkage. Sequence diagram for background job enqueue and post-commit schedulingsequenceDiagram
actor Caller
participant BackgroundJobService
participant UowManager
participant Uow
participant JobStore
participant JobScheduler
participant EventSerializer
Caller->>BackgroundJobService: EnqueueAsync(handlerName, jobName, payload, schedule, metadata)
BackgroundJobService->>EventSerializer: Serialize(CloudEventEnvelope)
EventSerializer-->>BackgroundJobService: payloadBytes
BackgroundJobService->>UowManager: BeginAsync(options RequiresNew)
UowManager-->>BackgroundJobService: Uow
BackgroundJobService->>JobStore: SaveAsync(jobInfo)
JobStore-->>BackgroundJobService: (saved)
BackgroundJobService->>Uow: SaveChangesAsync()
Uow-->>BackgroundJobService: (saved)
BackgroundJobService->>Uow: OnCompleted(handler)
Uow-->>BackgroundJobService: (registered)
BackgroundJobService->>Uow: CommitAsync()
Uow-->>BackgroundJobService: (committed)
Uow-->>JobScheduler: ScheduleAsync(handlerName, jobName, schedule, payloadBytes)
JobScheduler-->>Uow: (scheduled)
BackgroundJobService-->>Caller: jobId
alt failure during enqueue
BackgroundJobService->>Uow: RollbackAsync()
Uow-->>BackgroundJobService: (rolled back)
BackgroundJobService-->>Caller: exception
end
Sequence diagram for Dapr job execution within a unit of worksequenceDiagram
participant DaprScheduler
participant DaprJobExecutionBridge
participant UnitOfWorkManager
participant Uow
participant EventSerializer
participant JobStore
participant JobDispatcher
participant Logger
DaprScheduler->>DaprJobExecutionBridge: ExecuteAsync(jobName, payload)
DaprJobExecutionBridge->>EventSerializer: Deserialize(CloudEventEnvelope)
EventSerializer-->>DaprJobExecutionBridge: envelope or null
alt envelope
DaprJobExecutionBridge->>EventSerializer: Serialize(envelope.Data)
EventSerializer-->>DaprJobExecutionBridge: argsPayload
else raw payload
DaprJobExecutionBridge-->>DaprJobExecutionBridge: argsPayload = payload
end
DaprJobExecutionBridge->>UnitOfWorkManager: BeginAsync()
UnitOfWorkManager-->>DaprJobExecutionBridge: Uow
DaprJobExecutionBridge->>JobStore: GetByJobNameAsync(jobName)
JobStore-->>DaprJobExecutionBridge: jobInfo or null
alt jobInfo is null
DaprJobExecutionBridge->>Logger: LogError(job not found)
DaprJobExecutionBridge->>Uow: RollbackAsync()
Uow-->>DaprJobExecutionBridge: (rolled back)
DaprJobExecutionBridge-->>DaprScheduler: exception
else jobInfo found
DaprJobExecutionBridge->>JobDispatcher: DispatchAsync(jobInfo.Id, jobInfo.HandlerName, argsPayload)
JobDispatcher-->>DaprJobExecutionBridge: (handled)
DaprJobExecutionBridge->>Uow: CommitAsync()
Uow-->>DaprJobExecutionBridge: (committed)
DaprJobExecutionBridge-->>DaprScheduler: (completed)
end
alt any exception
DaprJobExecutionBridge->>Logger: LogError(ex)
DaprJobExecutionBridge->>Uow: RollbackAsync()
Uow-->>DaprJobExecutionBridge: (rolled back)
DaprJobExecutionBridge-->>DaprScheduler: exception
end
Sequence diagram for domain event routing from DbContext to local transactionsequenceDiagram
participant AppCode
participant EfCoreTransactionSource
participant AetherDbContext
participant EfCoreLocalTransaction
participant CompositeUnitOfWork
participant DomainEventSink
AppCode->>CompositeUnitOfWork: Begin()
CompositeUnitOfWork->>EfCoreTransactionSource: CreateTransactionAsync(context)
EfCoreTransactionSource->>AetherDbContext: (uses context)
EfCoreTransactionSource-->>CompositeUnitOfWork: EfCoreLocalTransaction
EfCoreTransactionSource->>AetherDbContext: set LocalEventEnqueuer(EfCoreLocalTransaction)
AppCode->>AetherDbContext: Modify entities
AppCode->>AetherDbContext: SaveChanges()
AetherDbContext-->>AetherDbContext: CollectDomainEvents()
alt LocalEventEnqueuer is set
AetherDbContext->>EfCoreLocalTransaction: EnqueueEvents(domainEvents)
AetherDbContext-->>AetherDbContext: ClearDomainEvents()
else no LocalEventEnqueuer
alt eventSink is set
AetherDbContext->>DomainEventSink: EnqueueDomainEvents(domainEvents)
AetherDbContext-->>AetherDbContext: ClearDomainEvents()
else no event sink
AetherDbContext-->>AetherDbContext: events remain for manual handling
end
end
AppCode->>CompositeUnitOfWork: CommitAsync()
CompositeUnitOfWork->>CompositeUnitOfWork: SaveChangesAsync() and dispatch collected events
CompositeUnitOfWork->>EfCoreLocalTransaction: DisposeAsync()
EfCoreLocalTransaction->>AetherDbContext: set LocalEventEnqueuer(null)
EfCoreLocalTransaction-->>CompositeUnitOfWork: (disposed)
Updated class diagram for DbContext, local transaction, and background job componentsclassDiagram
class BackgroundJobService {
+EnqueueAsync~TPayload~(string handlerName, string jobName, TPayload payload, string schedule, IDictionary~string,object~ metadata, CancellationToken cancellationToken) Task~Guid~
+UpdateAsync(Guid id, string newSchedule, CancellationToken cancellationToken) Task
+DeleteAsync(Guid id, CancellationToken cancellationToken) Task~bool~
}
class DaprJobExecutionBridge {
+ExecuteAsync(string jobName, ReadOnlyMemory~byte~ payload, CancellationToken cancellationToken) Task
-ReadOnlyMemory~byte~? TryExtractPayload(ReadOnlyMemory~byte~ payload)
}
class JobDispatcher {
+DispatchAsync(Guid jobId, string handlerName, ReadOnlyMemory~byte~ payload, CancellationToken cancellationToken) Task
-UpdateStatusWithinUowAsync(Guid jobId, BackgroundJobStatus status, string message, CancellationToken cancellationToken) Task
-HandleJobCancellationAsync(Guid jobId, CancellationToken cancellationToken) Task
-HandleJobFailureAsync(Guid jobId, Exception exception, CancellationToken cancellationToken) Task
}
class AetherDbContext~TDbContext~ {
+ILocalTransactionEventEnqueuer LocalEventEnqueuer
+CollectDomainEvents() List~DomainEventEnvelope~
+ClearDomainEvents() void
-PublishDomainEventsToSink() void
+SaveChanges() int
+SaveChangesAsync(CancellationToken cancellationToken) Task~int~
}
class EfCoreTransactionSource~TDbContext~ {
+CreateTransactionAsync(AetherDbContext~TDbContext~ dbContext, IsolationLevel? isolationLevel, CancellationToken cancellationToken) Task~ILocalTransaction~
}
class EfCoreLocalTransaction {
+EnsureTransactionAsync(IsolationLevel? isolationLevel, CancellationToken cancellationToken) Task
+CommitAsync(CancellationToken cancellationToken) Task
+RollbackAsync(CancellationToken cancellationToken) Task
+ClearCollectedEvents() void
+SaveChangesAsync(CancellationToken cancellationToken) Task
+DisposeAsync() ValueTask
+EnqueueEvents(IEnumerable~DomainEventEnvelope~ events) void
-AetherDbContext~TDbContext~ _context
-IDbContextTransaction _transaction
-List~DomainEventEnvelope~ _collectedEvents
}
class CompositeUnitOfWork {
+CommitAsync(CancellationToken cancellationToken) Task
+SaveChangesAsync(CancellationToken cancellationToken) Task
}
class ILocalTransactionEventEnqueuer {
<<interface>>
+EnqueueEvents(IEnumerable~DomainEventEnvelope~ events) void
}
class IUnitOfWorkManager {
<<interface>>
+BeginAsync(UnitOfWorkOptions options, CancellationToken cancellationToken) Task~IUnitOfWork~
+BeginAsync(CancellationToken cancellationToken) Task~IUnitOfWork~
}
class IUnitOfWork {
<<interface>>
+SaveChangesAsync(CancellationToken cancellationToken) Task
+CommitAsync(CancellationToken cancellationToken) Task
+RollbackAsync(CancellationToken cancellationToken) Task
+OnCompleted(Func~IUnitOfWork,Task~ handler) void
}
class IJobStore {
<<interface>>
+SaveAsync(BackgroundJobInfo jobInfo, CancellationToken cancellationToken) Task
+UpdateStatusAsync(Guid id, BackgroundJobStatus status, DateTime timestamp, string reason, CancellationToken cancellationToken) Task
+GetByJobNameAsync(string jobName, CancellationToken cancellationToken) Task~BackgroundJobInfo~
}
class IJobScheduler {
<<interface>>
+ScheduleAsync(string handlerName, string jobName, string schedule, ReadOnlyMemory~byte~ payload, CancellationToken cancellationToken) Task
+DeleteAsync(string handlerName, string jobName, CancellationToken cancellationToken) Task
}
class IEventSerializer {
<<interface>>
+Serialize(object value) byte[]
+Deserialize~T~(ReadOnlyMemory~byte~ payload) T
}
BackgroundJobService --> IUnitOfWorkManager : uses
BackgroundJobService --> IJobStore : uses
BackgroundJobService --> IJobScheduler : uses
BackgroundJobService --> IEventSerializer : uses
BackgroundJobService --> IUnitOfWork : manages
DaprJobExecutionBridge --> IUnitOfWorkManager : uses
DaprJobExecutionBridge --> IJobStore : uses
DaprJobExecutionBridge --> JobDispatcher : uses
DaprJobExecutionBridge --> IEventSerializer : uses
JobDispatcher --> IJobStore : uses
JobDispatcher --> IUnitOfWorkManager : uses
EfCoreTransactionSource~TDbContext~ --> AetherDbContext~TDbContext~ : creates transaction for
EfCoreTransactionSource~TDbContext~ --> EfCoreLocalTransaction : returns
EfCoreLocalTransaction ..|> ILocalTransactionEventEnqueuer : implements
AetherDbContext~TDbContext~ --> ILocalTransactionEventEnqueuer : LocalEventEnqueuer
CompositeUnitOfWork --> EfCoreLocalTransaction : coordinates
IUnitOfWorkManager --> IUnitOfWork : creates
IJobStore --> BackgroundJobInfo : persists
JobDispatcher --> BackgroundJobInfo : reads
IJobScheduler --> BackgroundJobInfo : schedules
IUnitOfWork <|.. EfCoreLocalTransaction : participates via owning UoW
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
Summary of ChangesHello @yilmaztayfun, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the reliability and correctness of domain event dispatching and background job processing within the framework. By establishing a direct link between Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a new mechanism for handling domain events within the Unit of Work (UoW) by establishing a direct link between DbContext and its local transaction. A new interface, ILocalTransactionEventEnqueuer, was added to facilitate this, allowing AetherDbContext to directly enqueue events to the correct transaction via its LocalEventEnqueuer property, which is set by EfCoreTransactionSource. The CompositeUnitOfWork.CommitAsync method was updated to include SaveChangesAsync to ensure changes are persisted before event dispatch. The BackgroundJobService.EnqueueAsync method was refactored to use a RequiresNew UoW scope and to schedule jobs within uow.OnCompleted to prevent race conditions, ensuring jobs are scheduled only after database persistence; a review comment noted a redundant SaveChangesAsync call in this method due to the CompositeUnitOfWork change. Additionally, the DaprJobExecutionBridge now wraps job execution in its own UoW, leading to the removal of internal UoW management from JobDispatcher. A review comment highlighted that job status updates for cancellation and failure in JobDispatcher would be rolled back, recommending these updates be performed in a new, separate transaction using UpdateStatusWithinUowAsync.
| private async Task HandleJobCancellationAsync(Guid jobId, CancellationToken cancellationToken) | ||
| { | ||
| try | ||
| { | ||
| await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Cancelled, | ||
| clock.UtcNow, "Job was cancelled", cancellationToken); | ||
| await uow.CommitAsync(cancellationToken); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| logger.LogError(ex, "Failed to update job status to Cancelled"); | ||
| await uow.RollbackAsync(cancellationToken); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The status update to Cancelled in this method will be rolled back. The DispatchAsync method calls this from within a catch block, but the DaprJobExecutionBridge (the caller of DispatchAsync) will roll back the entire Unit of Work upon catching the exception. This means the status update is lost, and the job might remain in a Running state in the database.
To fix this, the status update should be performed in a new, separate transaction. You can achieve this by using the existing helper method UpdateStatusWithinUowAsync.
private async Task HandleJobCancellationAsync(Guid jobId, CancellationToken cancellationToken)
{
await UpdateStatusWithinUowAsync(jobId, BackgroundJobStatus.Cancelled, "Job was cancelled", cancellationToken);
}| await jobStore.SaveAsync(jobInfo, cancellationToken); | ||
|
|
||
| // Commit transaction | ||
| await uow.SaveChangesAsync(cancellationToken); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes and found some issues that need to be addressed.
- In
BackgroundJobService.EnqueueAsync, theOnCompletedcallback reuses the originalcancellationToken, which may be canceled after the transaction commits and prevent the post-commit scheduling; consider using a separate token (orCancellationToken.None) for the scheduler call so that a successfully committed job cannot be silently dropped. - With the removal of the inner unit of work in
JobDispatcher.ExecuteWithinUowAsync, all status updates now rely on an ambient UoW; it would be safer to either assert/guard that a current UoW exists when this method is used or fall back to creating one to avoid non-transactional status changes when called without a surrounding UoW.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `BackgroundJobService.EnqueueAsync`, the `OnCompleted` callback reuses the original `cancellationToken`, which may be canceled after the transaction commits and prevent the post-commit scheduling; consider using a separate token (or `CancellationToken.None`) for the scheduler call so that a successfully committed job cannot be silently dropped.
- With the removal of the inner unit of work in `JobDispatcher.ExecuteWithinUowAsync`, all status updates now rely on an ambient UoW; it would be safer to either assert/guard that a current UoW exists when this method is used or fall back to creating one to avoid non-transactional status changes when called without a surrounding UoW.
## Individual Comments
### Comment 1
<location> `framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs:105-107` </location>
<code_context>
+ await uow.SaveChangesAsync(cancellationToken);
+ // Register scheduler to run AFTER commit is fully persisted to DB
+ // This prevents race condition where scheduler triggers before DB write completes
+ uow.OnCompleted(async _ =>
+ {
+ await jobScheduler.ScheduleAsync(handlerName, jobName, schedule, payloadBytes, cancellationToken);
+
+ logger.LogInformation(
</code_context>
<issue_to_address>
**issue (bug_risk):** Reconsider using the original CancellationToken in the OnCompleted callback.
If the original `cancellationToken` is canceled by the time `OnCompleted` runs (e.g., HTTP request aborted after the commit), `ScheduleAsync` and logging will be skipped even though the job was successfully persisted. That can leave jobs stored but never scheduled. Use `CancellationToken.None` (or a separate, clearly scoped token) for the post-commit scheduling/logging work, and reserve the original token for transactional operations only.
</issue_to_address>
### Comment 2
<location> `framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs:53-62` </location>
<code_context>
- var jobInfo = await jobStore.GetByJobNameAsync(jobName, cancellationToken);
- if (jobInfo == null)
+
+ await using var uow = await unitOfWorkManager.BeginAsync(cancellationToken: cancellationToken);
+ try
+ {
+ var jobInfo = await jobStore.GetByJobNameAsync(jobName, cancellationToken);
+ if (jobInfo == null)
+ {
+ logger.LogError("Job with name '{JobName}' not found in store", jobName);
+ throw new InvalidOperationException($"Job with name '{jobName}' not found in store.");
+ }
+
+ // Dispatch to handler using the handler name from job entity
+ await jobDispatcher.DispatchAsync(jobInfo.Id, jobInfo.HandlerName, argsPayload, cancellationToken);
+
+ await uow.CommitAsync(cancellationToken);
+ }
+ catch (Exception)
</code_context>
<issue_to_address>
**issue (bug_risk):** Status updates on job failure/cancellation are likely being rolled back with the Unit of Work.
Because the bridge now owns the outer UoW, the failure/cancellation path ends up like this:
- `JobDispatcher.ExecuteAsync` sets status to `Running` in its own `RequiresNew` UoW.
- On handler failure/cancellation it calls `HandleJobFailureAsync`/`HandleJobCancellationAsync`, which now rely on the bridge’s ambient UoW.
- The bridge then catches the exception and rolls back that UoW.
So the `Failed`/`Cancelled` updates are rolled back and the job remains `Running` (or older), unlike the previous implementation where those updates were committed in a local UoW.
I’d suggest either keeping failure/cancellation status updates in their own `RequiresNew` UoW, or adjusting the bridge so job metadata commits even when the dispatcher throws (e.g., separating handler side effects from job status persistence).
</issue_to_address>
### Comment 3
<location> `framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/EntityFrameworkCore/EfCoreTransactionSource.cs:61-64` </location>
<code_context>
IDbContextTransaction? transaction)
: ILocalTransaction, ITransactionalLocal, ISupportsSaveChanges, IAsyncDisposable, ILocalTransactionEventEnqueuer
{
+ private readonly AetherDbContext<TDbContext> _context = context;
private IDbContextTransaction? _transaction = transaction;
private readonly List<DomainEventEnvelope> _collectedEvents = new();
</code_context>
<issue_to_address>
**issue (bug_risk):** Clearing LocalEventEnqueuer in DisposeAsync can race with other local transactions on the same DbContext.
`EfCoreLocalTransaction` assigns `dbContext.LocalEventEnqueuer = localTx` on creation and clears it in `DisposeAsync`. With multiple local transactions on the same `DbContext` (nested/overlapping UoWs), a later-disposing older transaction can overwrite/null the `LocalEventEnqueuer` set by a newer one, breaking event routing for the active transaction.
To avoid this, either:
- only clear if it still points to this instance (e.g. `if (ReferenceEquals(_context.LocalEventEnqueuer, this)) ...`), or
- enforce and validate a single-local-transaction-per-context invariant.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| uow.OnCompleted(async _ => | ||
| { | ||
| await jobScheduler.ScheduleAsync(handlerName, jobName, schedule, payloadBytes, cancellationToken); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): Reconsider using the original CancellationToken in the OnCompleted callback.
If the original cancellationToken is canceled by the time OnCompleted runs (e.g., HTTP request aborted after the commit), ScheduleAsync and logging will be skipped even though the job was successfully persisted. That can leave jobs stored but never scheduled. Use CancellationToken.None (or a separate, clearly scoped token) for the post-commit scheduling/logging work, and reserve the original token for transactional operations only.
| await using var uow = await unitOfWorkManager.BeginAsync(cancellationToken: cancellationToken); | ||
| try | ||
| { | ||
| var jobInfo = await jobStore.GetByJobNameAsync(jobName, cancellationToken); | ||
| if (jobInfo == null) | ||
| { | ||
| logger.LogError("Job with name '{JobName}' not found in store", jobName); | ||
| throw new InvalidOperationException($"Job with name '{jobName}' not found in store."); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): Status updates on job failure/cancellation are likely being rolled back with the Unit of Work.
Because the bridge now owns the outer UoW, the failure/cancellation path ends up like this:
JobDispatcher.ExecuteAsyncsets status toRunningin its ownRequiresNewUoW.- On handler failure/cancellation it calls
HandleJobFailureAsync/HandleJobCancellationAsync, which now rely on the bridge’s ambient UoW. - The bridge then catches the exception and rolls back that UoW.
So the Failed/Cancelled updates are rolled back and the job remains Running (or older), unlike the previous implementation where those updates were committed in a local UoW.
I’d suggest either keeping failure/cancellation status updates in their own RequiresNew UoW, or adjusting the bridge so job metadata commits even when the dispatcher throws (e.g., separating handler side effects from job status persistence).
| private readonly AetherDbContext<TDbContext> _context = context; | ||
| private IDbContextTransaction? _transaction = transaction; | ||
| private readonly List<DomainEventEnvelope> _collectedEvents = new(); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): Clearing LocalEventEnqueuer in DisposeAsync can race with other local transactions on the same DbContext.
EfCoreLocalTransaction assigns dbContext.LocalEventEnqueuer = localTx on creation and clears it in DisposeAsync. With multiple local transactions on the same DbContext (nested/overlapping UoWs), a later-disposing older transaction can overwrite/null the LocalEventEnqueuer set by a newer one, breaking event routing for the active transaction.
To avoid this, either:
- only clear if it still points to this instance (e.g.
if (ReferenceEquals(_context.LocalEventEnqueuer, this)) ...), or - enforce and validate a single-local-transaction-per-context invariant.
|



Summary by Sourcery
Improve background job execution reliability and domain event routing by tightening unit-of-work boundaries and linking EF Core transactions directly to their owning DbContexts.
Bug Fixes:
Enhancements:
Summary by CodeRabbit
Release Notes
✏️ Tip: You can customize this high-level summary in your review settings.