-
Notifications
You must be signed in to change notification settings - Fork 0
Improve event routing and transaction handling in UoW and background … #28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…jobs Introduces ILocalTransactionEventEnqueuer to enable direct event routing from DbContext to the owning local transaction, ensuring domain events are dispatched to the correct Unit of Work. Refactors background job scheduling to use OnCompleted for post-commit scheduling, adds explicit transaction management in DaprJobExecutionBridge, and removes ambient UoW reliance in JobDispatcher. Updates .gitignore to exclude ai-docs/.
Reviewer's GuideRefactors domain event routing to flow from AetherDbContext through a new ILocalTransactionEventEnqueuer tied to EF Core local transactions, and hardens background job enqueueing and execution to run under explicit Units of Work with post-commit scheduling hooks instead of ambient context reliance. Sequence diagram for background job enqueue with post-commit schedulingsequenceDiagram
actor Caller
participant BackgroundJobService
participant UnitOfWorkManager
participant UnitOfWork
participant JobStore
participant JobScheduler
participant Logger
Caller->>BackgroundJobService: EnqueueAsync(handlerName, jobName, payload, schedule, metadata)
BackgroundJobService->>UnitOfWorkManager: BeginAsync(options Scope=RequiresNew)
UnitOfWorkManager-->>BackgroundJobService: UnitOfWork
BackgroundJobService->>JobStore: SaveAsync(BackgroundJobInfo)
BackgroundJobService->>UnitOfWork: SaveChangesAsync()
BackgroundJobService->>UnitOfWork: OnCompleted(callback)
Note right of UnitOfWork: Registers callback to schedule job after commit
BackgroundJobService->>UnitOfWork: CommitAsync()
UnitOfWork-->>BackgroundJobService: Commit completed
UnitOfWork->>JobScheduler: ScheduleAsync(handlerName, jobName, schedule, payloadBytes)
UnitOfWork->>Logger: LogInformation(scheduled successfully)
BackgroundJobService->>Logger: LogInformation(enqueued successfully)
BackgroundJobService-->>Caller: jobId
alt failure during enqueue
BackgroundJobService->>Logger: LogError(failure)
BackgroundJobService->>UnitOfWork: RollbackAsync()
BackgroundJobService-->>Caller: throws Exception
end
Sequence diagram for Dapr job execution under explicit Unit of WorksequenceDiagram
actor DaprScheduler
participant DaprJobExecutionBridge
participant UnitOfWorkManager
participant UnitOfWork
participant JobStore
participant JobDispatcher
participant Logger
participant EventSerializer
participant CurrentSchema
DaprScheduler->>DaprJobExecutionBridge: ExecuteAsync(jobName, payload)
DaprJobExecutionBridge->>EventSerializer: TryDeserializeEnvelope(payload)
alt envelope payload
EventSerializer-->>DaprJobExecutionBridge: CloudEventEnvelope
DaprJobExecutionBridge->>CurrentSchema: Set(envelope.Schema)
DaprJobExecutionBridge->>EventSerializer: Serialize(envelope.Data)
EventSerializer-->>DaprJobExecutionBridge: argsPayload
else raw payload
DaprJobExecutionBridge-->>DaprJobExecutionBridge: argsPayload = payload
end
DaprJobExecutionBridge->>UnitOfWorkManager: BeginAsync()
UnitOfWorkManager-->>DaprJobExecutionBridge: UnitOfWork
DaprJobExecutionBridge->>JobStore: GetByJobNameAsync(jobName)
alt job not found
JobStore-->>DaprJobExecutionBridge: null
DaprJobExecutionBridge->>Logger: LogError(job not found)
DaprJobExecutionBridge->>UnitOfWork: RollbackAsync()
DaprJobExecutionBridge-->>DaprScheduler: throws InvalidOperationException
else job found
JobStore-->>DaprJobExecutionBridge: jobInfo
DaprJobExecutionBridge->>JobDispatcher: DispatchAsync(jobInfo.Id, jobInfo.HandlerName, argsPayload)
DaprJobExecutionBridge->>UnitOfWork: CommitAsync()
DaprJobExecutionBridge-->>DaprScheduler: completed
end
alt unexpected exception
DaprJobExecutionBridge->>UnitOfWork: RollbackAsync()
DaprJobExecutionBridge->>Logger: LogError(executing job)
DaprJobExecutionBridge-->>DaprScheduler: throws Exception
end
Sequence diagram for domain event routing from AetherDbContext to local transactionsequenceDiagram
actor ApplicationCode
participant AetherDbContext
participant LocalTransactionEventEnqueuer
participant DomainEventSink
ApplicationCode->>AetherDbContext: SaveChangesAsync()
AetherDbContext-->>AetherDbContext: CollectDomainEvents()
alt has domain events
alt LocalEventEnqueuer is set
AetherDbContext->>LocalTransactionEventEnqueuer: EnqueueEvents(domainEvents)
AetherDbContext-->>AetherDbContext: ClearDomainEvents()
else eventSink is set
AetherDbContext->>DomainEventSink: EnqueueDomainEvents(domainEvents)
AetherDbContext-->>AetherDbContext: ClearDomainEvents()
else no enqueuer and no sink
AetherDbContext-->>AetherDbContext: Domain events remain for explicit collection
end
else no domain events
AetherDbContext-->>ApplicationCode: SaveChangesAsync result
end
Updated class diagram for Unit of Work, DbContext, and background job componentsclassDiagram
class AetherDbContext~TDbContext~ {
+ILocalTransactionEventEnqueuer LocalEventEnqueuer
-IDomainEventSink eventSink
+void ClearDomainEvents()
-void PublishDomainEventsToSink()
}
class ILocalTransactionEventEnqueuer {
+void EnqueueEvents(IEnumerable~DomainEventEnvelope~ events)
}
class EfCoreTransactionSource~TDbContext~ {
+Task~ILocalTransaction~ CreateTransactionAsync(AetherDbContext~TDbContext~ dbContext, CancellationToken cancellationToken)
}
class EfCoreLocalTransaction {
-AetherDbContext~TDbContext~ _context
-IDbContextTransaction _transaction
-List~DomainEventEnvelope~ _collectedEvents
+Task EnsureTransactionAsync(IsolationLevel? isolationLevel, CancellationToken cancellationToken)
+Task CommitAsync(CancellationToken cancellationToken)
+Task RollbackAsync(CancellationToken cancellationToken)
+void EnqueueEvents(IEnumerable~DomainEventEnvelope~ events)
+void ClearCollectedEvents()
+Task SaveChangesAsync(CancellationToken cancellationToken)
+ValueTask DisposeAsync()
}
class ILocalTransaction {
+Task EnsureTransactionAsync(IsolationLevel? isolationLevel, CancellationToken cancellationToken)
+Task CommitAsync(CancellationToken cancellationToken)
+Task RollbackAsync(CancellationToken cancellationToken)
+void ClearCollectedEvents()
}
class ITransactionalLocal {
}
class ISupportsSaveChanges {
+Task SaveChangesAsync(CancellationToken cancellationToken)
}
class IAsyncDisposable {
+ValueTask DisposeAsync()
}
EfCoreTransactionSource~TDbContext~ --> EfCoreLocalTransaction : creates
AetherDbContext~TDbContext~ --> ILocalTransactionEventEnqueuer : LocalEventEnqueuer
EfCoreLocalTransaction ..|> ILocalTransactionEventEnqueuer
EfCoreLocalTransaction ..|> ILocalTransaction
EfCoreLocalTransaction ..|> ITransactionalLocal
EfCoreLocalTransaction ..|> ISupportsSaveChanges
EfCoreLocalTransaction ..|> IAsyncDisposable
class CompositeUnitOfWork {
+Task CommitAsync(CancellationToken cancellationToken)
+Task SaveChangesAsync(CancellationToken cancellationToken)
-DomainEventOptions domainEventOptions
}
class DomainEventOptions {
+DomainEventDispatchStrategy DispatchStrategy
}
CompositeUnitOfWork --> DomainEventOptions : uses
class BackgroundJobService {
+Task~Guid~ EnqueueAsync~TPayload~(string handlerName, string jobName, TPayload payload, string schedule, Dictionary~string,object~ metadata, CancellationToken cancellationToken)
-IUnitOfWorkManager uowManager
-IJobStore jobStore
-IJobScheduler jobScheduler
-IEventSerializer eventSerializer
-IGuidGenerator guidGenerator
-ICurrentSchema currentSchema
-ILogger~BackgroundJobService~ logger
}
class DaprJobExecutionBridge {
+Task ExecuteAsync(string jobName, ReadOnlyMemory~byte~ payload, CancellationToken cancellationToken)
-IJobDispatcher jobDispatcher
-IJobStore jobStore
-ICurrentSchema currentSchema
-IEventSerializer eventSerializer
-IUnitOfWorkManager unitOfWorkManager
-ILogger~DaprJobExecutionBridge~ logger
}
class JobDispatcher {
+Task DispatchAsync(Guid jobId, string handlerName, ReadOnlyMemory~byte~ payload, CancellationToken cancellationToken)
-Task UpdateStatusWithinUowAsync(Guid jobId, BackgroundJobStatus status, string message, CancellationToken cancellationToken)
-Task HandleJobCancellationAsync(Guid jobId, CancellationToken cancellationToken)
-Task HandleJobFailureAsync(Guid jobId, Exception exception, CancellationToken cancellationToken)
-IJobStore jobStore
-IUnitOfWorkManager uowManager
-IClock clock
-ILogger~JobDispatcher~ logger
}
class IUnitOfWorkManager {
+Task~IUnitOfWork~ BeginAsync(UnitOfWorkOptions options, CancellationToken cancellationToken)
+Task~IUnitOfWork~ BeginAsync(CancellationToken cancellationToken)
}
class IUnitOfWork {
+Task CommitAsync(CancellationToken cancellationToken)
+Task RollbackAsync(CancellationToken cancellationToken)
+Task SaveChangesAsync(CancellationToken cancellationToken)
+void OnCompleted(Func~IUnitOfWork,Task~ callback)
}
BackgroundJobService --> IUnitOfWorkManager
BackgroundJobService --> IJobStore
BackgroundJobService --> IJobScheduler
BackgroundJobService --> IEventSerializer
DaprJobExecutionBridge --> IUnitOfWorkManager
DaprJobExecutionBridge --> IJobStore
DaprJobExecutionBridge --> IJobDispatcher
File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
Caution Review failedThe pull request is closed. Note
|
| Cohort / File(s) | Summary |
|---|---|
Configuration & Formatting .gitignore, framework/src/BBT.Aether.Aspects/BBT/Aether/Aspects/Uow/UnitOfWorkAttribute.cs |
Added ai-docs/ to ignore list and corrected whitespace formatting in UnitOfWorkAttribute |
Event Enqueuing Contract framework/src/BBT.Aether.Core/BBT/Aether/Uow/ILocalTransactionEventEnqueuer.cs |
Introduced new interface with EnqueueEvents(IEnumerable<DomainEventEnvelope> events) method for local transaction event collection |
Background Job Transactional Integration framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs, framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs, framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs |
Wrapped job execution in UnitOfWork scopes; deferred scheduling to post-commit callbacks; added IUnitOfWorkManager dependency to bridge; removed UoW passing from cancellation/failure handlers |
DbContext Event Routing & Priority framework/src/BBT.Aether.Infrastructure/BBT/Aether/Domain/EntityFrameworkCore/AetherDbContext.cs, framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/EntityFrameworkCore/EfCoreTransactionSource.cs |
Added LocalEventEnqueuer property to DbContext with priority-based routing (LocalEventEnqueuer first, eventSink fallback); wired DbContext to EfCoreLocalTransaction; implemented EnqueueEvents in transaction source; added lifecycle cleanup in DisposeAsync |
UnitOfWork Commit-Phase Event Dispatch framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/CompositeUnitOfWork.cs |
Added SaveChangesAsync call before domain event dispatch in CommitAsync to ensure all changes persisted prior to outbox/direct publish strategy |
Sequence Diagram
sequenceDiagram
participant App as Application/DaprService
participant UoW as UnitOfWork
participant DbCtx as DbContext
participant LocalTx as EfCoreLocalTransaction
participant EventMgr as EventEnqueuer
participant Sink as EventSink
rect rgb(200, 220, 240)
Note over App,Sink: New Transactional Event Flow
end
App->>UoW: BeginAsync(RequiresNew)
UoW->>DbCtx: Create/Initialize
DbCtx->>LocalTx: Set LocalEventEnqueuer
App->>DbCtx: Entity Updates
App->>DbCtx: SaveChangesAsync()
rect rgb(220, 240, 220)
Note over DbCtx,LocalTx: SaveChanges Phase - Events Collected
end
DbCtx->>LocalTx: Collect Domain Events
DbCtx->>LocalTx: EnqueueEvents(collected)
LocalTx->>LocalTx: Store in Local Collection
DbCtx->>EventMgr: PublishDomainEventsToSink()
alt LocalEventEnqueuer Available
EventMgr->>LocalTx: Enqueue via LocalEventEnqueuer
else Fallback
EventMgr->>Sink: Enqueue via eventSink
end
rect rgb(240, 220, 220)
Note over UoW,EventMgr: Commit Phase
end
App->>UoW: CommitAsync()
UoW->>DbCtx: Commit Transaction
UoW->>EventMgr: Dispatch Enqueued Events
EventMgr->>Sink: Publish Events
UoW->>App: Complete
rect rgb(240, 240, 220)
Note over UoW,LocalTx: Post-Commit Cleanup
end
App->>LocalTx: DisposeAsync()
LocalTx->>DbCtx: Clear LocalEventEnqueuer Link
LocalTx->>LocalTx: Dispose Transaction
Estimated code review effort
🎯 4 (Complex) | ⏱️ ~50 minutes
Areas requiring extra attention:
- EfCoreTransactionSource.cs: Verify DbContext.LocalEventEnqueuer lifecycle management, disposal cleanup, and field initialization (_context reference)
- AetherDbContext.cs: Confirm priority-based routing logic is correct and backward compatibility with eventSink fallback works as intended
- BackgroundJobService.cs & DaprJobExecutionBridge.cs: Validate transactional scope correctness, race condition prevention between DB write and scheduler, and OnCompleted callback timing
- Event enqueuing during SaveChanges: Ensure domain events collected during SaveChanges are properly routed and not lost if exception occurs
- UnitOfWork composition: Verify SaveChangesAsync call in CompositeUnitOfWork.CommitAsync doesn't duplicate or conflict with entity updates
Possibly related PRs
- 15 support local domain events #16: Both PRs directly modify domain-event and transaction plumbing (AetherDbContext, LocalEventEnqueuer interface, and EF Core transaction sources) to refactor event routing and enqueuing during SaveChanges and commit flows.
Suggested labels
enhancement
Suggested reviewers
- middt
- tsimsekburgan
Poem
🐰 Events now flow with transactional grace,
Deferring their launch to the right time and place,
With queues in the scope and priority's art,
Each domain event plays its committed part! 🌱✨
✨ Finishing touches
- 📝 Generate docstrings
🧪 Generate unit tests (beta)
- Create PR with unit tests
- Post copyable unit tests in a comment
- Commit unit tests in branch
hateoas-pagination
📜 Recent review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
.gitignore(1 hunks)framework/src/BBT.Aether.Aspects/BBT/Aether/Aspects/Uow/UnitOfWorkAttribute.cs(1 hunks)framework/src/BBT.Aether.Core/BBT/Aether/Uow/ILocalTransactionEventEnqueuer.cs(1 hunks)framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs(8 hunks)framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs(5 hunks)framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs(2 hunks)framework/src/BBT.Aether.Infrastructure/BBT/Aether/Domain/EntityFrameworkCore/AetherDbContext.cs(3 hunks)framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/CompositeUnitOfWork.cs(1 hunks)framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/EntityFrameworkCore/EfCoreTransactionSource.cs(3 hunks)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.
Comment @coderabbitai help to get the list of available commands and usage tips.
Summary of ChangesHello @yilmaztayfun, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the reliability and consistency of event routing and background job processing within the system. By introducing a direct event enqueuing mechanism from DbContext to local transactions and refining the Unit of Work management for background jobs, it addresses potential race conditions and ensures that domain events and job scheduling are tightly coupled with transaction commits. These changes lead to a more robust and predictable event-driven architecture. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces several excellent improvements to event routing and transaction handling, especially concerning background jobs. The new ILocalTransactionEventEnqueuer provides a much more reliable way to route domain events to the correct Unit of Work. Using OnCompleted for scheduling background jobs is a smart move to prevent race conditions. The explicit transaction management in DaprJobExecutionBridge and removing ambient UoW reliance in JobDispatcher are also great changes for robustness. My review includes one high-severity suggestion regarding cancellation token handling within the OnCompleted callback to prevent potential inconsistencies in job scheduling.
| uow.OnCompleted(async _ => | ||
| { | ||
| await jobScheduler.ScheduleAsync(handlerName, jobName, schedule, payloadBytes, cancellationToken); | ||
|
|
||
| logger.LogInformation( | ||
| "Successfully scheduled job handler '{HandlerName}' with job name '{JobName}'. Entity ID: {EntityId}", | ||
| handlerName, jobName, jobId); | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The OnCompleted handler captures the cancellationToken from the EnqueueAsync method's scope. This could lead to an issue where if the original request is cancelled after the transaction has been committed but before the OnCompleted handler finishes, the job scheduling via jobScheduler.ScheduleAsync would also be cancelled. This would leave the system in an inconsistent state where the BackgroundJobInfo entity is persisted in the database, but the job is not actually scheduled to run. Since the OnCompleted handler executes after the commit is successful, its operation should be decoupled from the cancellation of the original request. Using CancellationToken.None will ensure the scheduling proceeds regardless of the original request's status post-commit, making the process more robust.
uow.OnCompleted(async _ =>
{
// Use CancellationToken.None to ensure scheduling is not cancelled
// by the original request's token after the UoW has committed.
await jobScheduler.ScheduleAsync(handlerName, jobName, schedule, payloadBytes, CancellationToken.None);
logger.LogInformation(
"Successfully scheduled job handler '{HandlerName}' with job name '{JobName}'. Entity ID: {EntityId}",
handlerName, jobName, jobId);
});There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- In BackgroundJobService.EnqueueAsync the OnCompleted callback captures the original CancellationToken; consider using a non-cancellable token (or explicitly documenting the behavior) so that post-commit scheduling isn’t skipped simply because the caller cancelled after persistence succeeded.
- BackgroundJobService.UpdateAsync still schedules the updated job before persisting the new schedule; to avoid the same race you fixed in EnqueueAsync, consider moving scheduling into an OnCompleted callback after SaveChanges/commit.
- In JobDispatcher, the success path uses UpdateStatusWithinUowAsync (with a new UoW per status change) while the cancellation/failure paths now update status without any explicit UoW; it may be worth aligning these paths so all status transitions follow a consistent transactional pattern.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In BackgroundJobService.EnqueueAsync the OnCompleted callback captures the original CancellationToken; consider using a non-cancellable token (or explicitly documenting the behavior) so that post-commit scheduling isn’t skipped simply because the caller cancelled after persistence succeeded.
- BackgroundJobService.UpdateAsync still schedules the updated job before persisting the new schedule; to avoid the same race you fixed in EnqueueAsync, consider moving scheduling into an OnCompleted callback after SaveChanges/commit.
- In JobDispatcher, the success path uses UpdateStatusWithinUowAsync (with a new UoW per status change) while the cancellation/failure paths now update status without any explicit UoW; it may be worth aligning these paths so all status transitions follow a consistent transactional pattern.
## Individual Comments
### Comment 1
<location> `framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs:105-107` </location>
<code_context>
+ await uow.SaveChangesAsync(cancellationToken);
+ // Register scheduler to run AFTER commit is fully persisted to DB
+ // This prevents race condition where scheduler triggers before DB write completes
+ uow.OnCompleted(async _ =>
+ {
+ await jobScheduler.ScheduleAsync(handlerName, jobName, schedule, payloadBytes, cancellationToken);
+
+ logger.LogInformation(
</code_context>
<issue_to_address>
**issue (bug_risk):** Reusing the original cancellation token in OnCompleted can leave the job persisted but never scheduled.
Because the callback runs after the transaction commits, reusing the `EnqueueAsync` `cancellationToken` means a cancellation between commit and callback can prevent scheduling while the job is already persisted. Use `CancellationToken.None` (or a dedicated timeout token) in `OnCompleted`, and reserve the original token for operations up to and including `SaveChangesAsync`.
</issue_to_address>
### Comment 2
<location> `framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/EntityFrameworkCore/EfCoreTransactionSource.cs:61-64` </location>
<code_context>
IDbContextTransaction? transaction)
: ILocalTransaction, ITransactionalLocal, ISupportsSaveChanges, IAsyncDisposable, ILocalTransactionEventEnqueuer
{
+ private readonly AetherDbContext<TDbContext> _context = context;
private IDbContextTransaction? _transaction = transaction;
private readonly List<DomainEventEnvelope> _collectedEvents = new();
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Clearing LocalEventEnqueuer in DisposeAsync could race if the context outlives the transaction or is reused.
`EfCoreLocalTransaction` holds `_context` and sets `dbContext.LocalEventEnqueuer = localTx` on creation, then always clears it in `DisposeAsync`. If an `AetherDbContext` instance can later be bound to a different transaction, disposing the old one would null out `LocalEventEnqueuer` for the new transaction. If that reuse is possible, guard the clear (only null if it still equals this instance) or adjust the ownership model to prevent cross-transaction interference.
Suggested implementation:
```csharp
private readonly AetherDbContext<TDbContext> _context = context;
private IDbContextTransaction? _transaction = transaction;
private readonly List<DomainEventEnvelope> _collectedEvents = new();
```
```csharp
_transaction = isolationLevel.HasValue
? await _context.Database.BeginTransactionAsync(isolationLevel.Value, cancellationToken)
: await _context.Database.BeginTransactionAsync(cancellationToken);
}
```
```csharp
public async ValueTask DisposeAsync()
{
// Only clear LocalEventEnqueuer if it still belongs to this transaction instance
if (ReferenceEquals(_context.LocalEventEnqueuer, this))
{
_context.LocalEventEnqueuer = null;
}
if (_transaction != null)
{
await _transaction.DisposeAsync();
_transaction = null;
}
}
```
If `DisposeAsync` in your file has a slightly different structure or additional logic, apply the same pattern at the point where `LocalEventEnqueuer` is cleared:
1. Wrap the assignment to `LocalEventEnqueuer = null` in a `ReferenceEquals(_context.LocalEventEnqueuer, this)` check.
2. Ensure all usages of the constructor parameter `context` inside `EfCoreLocalTransaction` are switched to the `_context` field for consistency, e.g. `context.LocalEventEnqueuer` → `_context.LocalEventEnqueuer`.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| uow.OnCompleted(async _ => | ||
| { | ||
| await jobScheduler.ScheduleAsync(handlerName, jobName, schedule, payloadBytes, cancellationToken); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): Reusing the original cancellation token in OnCompleted can leave the job persisted but never scheduled.
Because the callback runs after the transaction commits, reusing the EnqueueAsync cancellationToken means a cancellation between commit and callback can prevent scheduling while the job is already persisted. Use CancellationToken.None (or a dedicated timeout token) in OnCompleted, and reserve the original token for operations up to and including SaveChangesAsync.
| private readonly AetherDbContext<TDbContext> _context = context; | ||
| private IDbContextTransaction? _transaction = transaction; | ||
| private readonly List<DomainEventEnvelope> _collectedEvents = new(); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (bug_risk): Clearing LocalEventEnqueuer in DisposeAsync could race if the context outlives the transaction or is reused.
EfCoreLocalTransaction holds _context and sets dbContext.LocalEventEnqueuer = localTx on creation, then always clears it in DisposeAsync. If an AetherDbContext instance can later be bound to a different transaction, disposing the old one would null out LocalEventEnqueuer for the new transaction. If that reuse is possible, guard the clear (only null if it still equals this instance) or adjust the ownership model to prevent cross-transaction interference.
Suggested implementation:
private readonly AetherDbContext<TDbContext> _context = context;
private IDbContextTransaction? _transaction = transaction;
private readonly List<DomainEventEnvelope> _collectedEvents = new(); _transaction = isolationLevel.HasValue
? await _context.Database.BeginTransactionAsync(isolationLevel.Value, cancellationToken)
: await _context.Database.BeginTransactionAsync(cancellationToken);
} public async ValueTask DisposeAsync()
{
// Only clear LocalEventEnqueuer if it still belongs to this transaction instance
if (ReferenceEquals(_context.LocalEventEnqueuer, this))
{
_context.LocalEventEnqueuer = null;
}
if (_transaction != null)
{
await _transaction.DisposeAsync();
_transaction = null;
}
}If DisposeAsync in your file has a slightly different structure or additional logic, apply the same pattern at the point where LocalEventEnqueuer is cleared:
- Wrap the assignment to
LocalEventEnqueuer = nullin aReferenceEquals(_context.LocalEventEnqueuer, this)check. - Ensure all usages of the constructor parameter
contextinsideEfCoreLocalTransactionare switched to the_contextfield for consistency, e.g.context.LocalEventEnqueuer→_context.LocalEventEnqueuer.
|



…jobs
Introduces ILocalTransactionEventEnqueuer to enable direct event routing from DbContext to the owning local transaction, ensuring domain events are dispatched to the correct Unit of Work. Refactors background job scheduling to use OnCompleted for post-commit scheduling, adds explicit transaction management in DaprJobExecutionBridge, and removes ambient UoW reliance in JobDispatcher. Updates .gitignore to exclude ai-docs/.
Summary by Sourcery
Route domain events directly from EF Core DbContext to their owning local transactions and decouple background job execution from ambient units of work while ensuring jobs are scheduled only after commits complete.
Enhancements:
Summary by CodeRabbit
Infrastructure Improvements
Chores
✏️ Tip: You can customize this high-level summary in your review settings.