Skip to content

Conversation

@yilmaztayfun
Copy link
Contributor

@yilmaztayfun yilmaztayfun commented Dec 9, 2025

…jobs

Introduces ILocalTransactionEventEnqueuer to enable direct event routing from DbContext to the owning local transaction, ensuring domain events are dispatched to the correct Unit of Work. Refactors background job scheduling to use OnCompleted for post-commit scheduling, adds explicit transaction management in DaprJobExecutionBridge, and removes ambient UoW reliance in JobDispatcher. Updates .gitignore to exclude ai-docs/.

Summary by Sourcery

Route domain events directly from EF Core DbContext to their owning local transactions and decouple background job execution from ambient units of work while ensuring jobs are scheduled only after commits complete.

Enhancements:

  • Add a local transaction event enqueuer contract and wire EF Core DbContext to push domain events directly into the owning local transaction, falling back to ambient UoW-based sinks.
  • Adjust EF Core transaction source to register itself as the DbContext's event enqueuer for the lifetime of the local transaction and ensure changes are saved before domain event dispatch.
  • Refine background job enqueueing to use a new, isolated unit of work and schedule jobs via post-commit callbacks to avoid race conditions with the scheduler.
  • Update Dapr job execution to run within an explicit unit of work and commit or roll back around job dispatch, and simplify JobDispatcher to rely on the existing UoW instead of creating a new one for status updates.

Summary by CodeRabbit

  • Infrastructure Improvements

    • Background job scheduling now aligns with transaction completion.
    • Transaction management flow has been refined with improved event routing and enqueuing capabilities.
    • Database context coordination with transactional scopes has been enhanced for better operational sequencing and consistency.
  • Chores

    • Updated repository configuration and ignore patterns.

✏️ Tip: You can customize this high-level summary in your review settings.

…jobs

Introduces ILocalTransactionEventEnqueuer to enable direct event routing from DbContext to the owning local transaction, ensuring domain events are dispatched to the correct Unit of Work. Refactors background job scheduling to use OnCompleted for post-commit scheduling, adds explicit transaction management in DaprJobExecutionBridge, and removes ambient UoW reliance in JobDispatcher. Updates .gitignore to exclude ai-docs/.
@yilmaztayfun yilmaztayfun requested review from a team as code owners December 9, 2025 11:07
@yilmaztayfun yilmaztayfun requested review from middt and removed request for a team December 9, 2025 11:07
@sourcery-ai
Copy link

sourcery-ai bot commented Dec 9, 2025

Reviewer's Guide

Refactors domain event routing to flow from AetherDbContext through a new ILocalTransactionEventEnqueuer tied to EF Core local transactions, and hardens background job enqueueing and execution to run under explicit Units of Work with post-commit scheduling hooks instead of ambient context reliance.

Sequence diagram for background job enqueue with post-commit scheduling

sequenceDiagram
    actor Caller
    participant BackgroundJobService
    participant UnitOfWorkManager
    participant UnitOfWork
    participant JobStore
    participant JobScheduler
    participant Logger

    Caller->>BackgroundJobService: EnqueueAsync(handlerName, jobName, payload, schedule, metadata)
    BackgroundJobService->>UnitOfWorkManager: BeginAsync(options Scope=RequiresNew)
    UnitOfWorkManager-->>BackgroundJobService: UnitOfWork

    BackgroundJobService->>JobStore: SaveAsync(BackgroundJobInfo)
    BackgroundJobService->>UnitOfWork: SaveChangesAsync()

    BackgroundJobService->>UnitOfWork: OnCompleted(callback)
    Note right of UnitOfWork: Registers callback to schedule job after commit

    BackgroundJobService->>UnitOfWork: CommitAsync()
    UnitOfWork-->>BackgroundJobService: Commit completed

    UnitOfWork->>JobScheduler: ScheduleAsync(handlerName, jobName, schedule, payloadBytes)
    UnitOfWork->>Logger: LogInformation(scheduled successfully)

    BackgroundJobService->>Logger: LogInformation(enqueued successfully)
    BackgroundJobService-->>Caller: jobId

    alt failure during enqueue
        BackgroundJobService->>Logger: LogError(failure)
        BackgroundJobService->>UnitOfWork: RollbackAsync()
        BackgroundJobService-->>Caller: throws Exception
    end
Loading

Sequence diagram for Dapr job execution under explicit Unit of Work

sequenceDiagram
    actor DaprScheduler
    participant DaprJobExecutionBridge
    participant UnitOfWorkManager
    participant UnitOfWork
    participant JobStore
    participant JobDispatcher
    participant Logger
    participant EventSerializer
    participant CurrentSchema

    DaprScheduler->>DaprJobExecutionBridge: ExecuteAsync(jobName, payload)

    DaprJobExecutionBridge->>EventSerializer: TryDeserializeEnvelope(payload)
    alt envelope payload
        EventSerializer-->>DaprJobExecutionBridge: CloudEventEnvelope
        DaprJobExecutionBridge->>CurrentSchema: Set(envelope.Schema)
        DaprJobExecutionBridge->>EventSerializer: Serialize(envelope.Data)
        EventSerializer-->>DaprJobExecutionBridge: argsPayload
    else raw payload
        DaprJobExecutionBridge-->>DaprJobExecutionBridge: argsPayload = payload
    end

    DaprJobExecutionBridge->>UnitOfWorkManager: BeginAsync()
    UnitOfWorkManager-->>DaprJobExecutionBridge: UnitOfWork

    DaprJobExecutionBridge->>JobStore: GetByJobNameAsync(jobName)
    alt job not found
        JobStore-->>DaprJobExecutionBridge: null
        DaprJobExecutionBridge->>Logger: LogError(job not found)
        DaprJobExecutionBridge->>UnitOfWork: RollbackAsync()
        DaprJobExecutionBridge-->>DaprScheduler: throws InvalidOperationException
    else job found
        JobStore-->>DaprJobExecutionBridge: jobInfo
        DaprJobExecutionBridge->>JobDispatcher: DispatchAsync(jobInfo.Id, jobInfo.HandlerName, argsPayload)
        DaprJobExecutionBridge->>UnitOfWork: CommitAsync()
        DaprJobExecutionBridge-->>DaprScheduler: completed
    end

    alt unexpected exception
        DaprJobExecutionBridge->>UnitOfWork: RollbackAsync()
        DaprJobExecutionBridge->>Logger: LogError(executing job)
        DaprJobExecutionBridge-->>DaprScheduler: throws Exception
    end
Loading

Sequence diagram for domain event routing from AetherDbContext to local transaction

sequenceDiagram
    actor ApplicationCode
    participant AetherDbContext
    participant LocalTransactionEventEnqueuer
    participant DomainEventSink

    ApplicationCode->>AetherDbContext: SaveChangesAsync()
    AetherDbContext-->>AetherDbContext: CollectDomainEvents()

    alt has domain events
        alt LocalEventEnqueuer is set
            AetherDbContext->>LocalTransactionEventEnqueuer: EnqueueEvents(domainEvents)
            AetherDbContext-->>AetherDbContext: ClearDomainEvents()
        else eventSink is set
            AetherDbContext->>DomainEventSink: EnqueueDomainEvents(domainEvents)
            AetherDbContext-->>AetherDbContext: ClearDomainEvents()
        else no enqueuer and no sink
            AetherDbContext-->>AetherDbContext: Domain events remain for explicit collection
        end
    else no domain events
        AetherDbContext-->>ApplicationCode: SaveChangesAsync result
    end
Loading

Updated class diagram for Unit of Work, DbContext, and background job components

classDiagram
    class AetherDbContext~TDbContext~ {
        +ILocalTransactionEventEnqueuer LocalEventEnqueuer
        -IDomainEventSink eventSink
        +void ClearDomainEvents()
        -void PublishDomainEventsToSink()
    }

    class ILocalTransactionEventEnqueuer {
        +void EnqueueEvents(IEnumerable~DomainEventEnvelope~ events)
    }

    class EfCoreTransactionSource~TDbContext~ {
        +Task~ILocalTransaction~ CreateTransactionAsync(AetherDbContext~TDbContext~ dbContext, CancellationToken cancellationToken)
    }

    class EfCoreLocalTransaction {
        -AetherDbContext~TDbContext~ _context
        -IDbContextTransaction _transaction
        -List~DomainEventEnvelope~ _collectedEvents
        +Task EnsureTransactionAsync(IsolationLevel? isolationLevel, CancellationToken cancellationToken)
        +Task CommitAsync(CancellationToken cancellationToken)
        +Task RollbackAsync(CancellationToken cancellationToken)
        +void EnqueueEvents(IEnumerable~DomainEventEnvelope~ events)
        +void ClearCollectedEvents()
        +Task SaveChangesAsync(CancellationToken cancellationToken)
        +ValueTask DisposeAsync()
    }

    class ILocalTransaction {
        +Task EnsureTransactionAsync(IsolationLevel? isolationLevel, CancellationToken cancellationToken)
        +Task CommitAsync(CancellationToken cancellationToken)
        +Task RollbackAsync(CancellationToken cancellationToken)
        +void ClearCollectedEvents()
    }

    class ITransactionalLocal {
    }

    class ISupportsSaveChanges {
        +Task SaveChangesAsync(CancellationToken cancellationToken)
    }

    class IAsyncDisposable {
        +ValueTask DisposeAsync()
    }

    EfCoreTransactionSource~TDbContext~ --> EfCoreLocalTransaction : creates
    AetherDbContext~TDbContext~ --> ILocalTransactionEventEnqueuer : LocalEventEnqueuer
    EfCoreLocalTransaction ..|> ILocalTransactionEventEnqueuer
    EfCoreLocalTransaction ..|> ILocalTransaction
    EfCoreLocalTransaction ..|> ITransactionalLocal
    EfCoreLocalTransaction ..|> ISupportsSaveChanges
    EfCoreLocalTransaction ..|> IAsyncDisposable

    class CompositeUnitOfWork {
        +Task CommitAsync(CancellationToken cancellationToken)
        +Task SaveChangesAsync(CancellationToken cancellationToken)
        -DomainEventOptions domainEventOptions
    }

    class DomainEventOptions {
        +DomainEventDispatchStrategy DispatchStrategy
    }

    CompositeUnitOfWork --> DomainEventOptions : uses

    class BackgroundJobService {
        +Task~Guid~ EnqueueAsync~TPayload~(string handlerName, string jobName, TPayload payload, string schedule, Dictionary~string,object~ metadata, CancellationToken cancellationToken)
        -IUnitOfWorkManager uowManager
        -IJobStore jobStore
        -IJobScheduler jobScheduler
        -IEventSerializer eventSerializer
        -IGuidGenerator guidGenerator
        -ICurrentSchema currentSchema
        -ILogger~BackgroundJobService~ logger
    }

    class DaprJobExecutionBridge {
        +Task ExecuteAsync(string jobName, ReadOnlyMemory~byte~ payload, CancellationToken cancellationToken)
        -IJobDispatcher jobDispatcher
        -IJobStore jobStore
        -ICurrentSchema currentSchema
        -IEventSerializer eventSerializer
        -IUnitOfWorkManager unitOfWorkManager
        -ILogger~DaprJobExecutionBridge~ logger
    }

    class JobDispatcher {
        +Task DispatchAsync(Guid jobId, string handlerName, ReadOnlyMemory~byte~ payload, CancellationToken cancellationToken)
        -Task UpdateStatusWithinUowAsync(Guid jobId, BackgroundJobStatus status, string message, CancellationToken cancellationToken)
        -Task HandleJobCancellationAsync(Guid jobId, CancellationToken cancellationToken)
        -Task HandleJobFailureAsync(Guid jobId, Exception exception, CancellationToken cancellationToken)
        -IJobStore jobStore
        -IUnitOfWorkManager uowManager
        -IClock clock
        -ILogger~JobDispatcher~ logger
    }

    class IUnitOfWorkManager {
        +Task~IUnitOfWork~ BeginAsync(UnitOfWorkOptions options, CancellationToken cancellationToken)
        +Task~IUnitOfWork~ BeginAsync(CancellationToken cancellationToken)
    }

    class IUnitOfWork {
        +Task CommitAsync(CancellationToken cancellationToken)
        +Task RollbackAsync(CancellationToken cancellationToken)
        +Task SaveChangesAsync(CancellationToken cancellationToken)
        +void OnCompleted(Func~IUnitOfWork,Task~ callback)
    }

    BackgroundJobService --> IUnitOfWorkManager
    BackgroundJobService --> IJobStore
    BackgroundJobService --> IJobScheduler
    BackgroundJobService --> IEventSerializer

    DaprJobExecutionBridge --> IUnitOfWorkManager
    DaprJobExecutionBridge --> IJobStore
    DaprJobExecutionBridge --> IJobDispatcher
Loading

File-Level Changes

Change Details Files
Make background job enqueueing transactional and schedule jobs only after the job record is fully committed.
  • Wrap BackgroundJobService.EnqueueAsync in a new-scoped Unit of Work (RequiresNew) and call SaveChangesAsync before commit.
  • Register jobScheduler.ScheduleAsync in an OnCompleted callback so scheduling happens only after commit finishes.
  • Adjust logging in enqueue flow and keep update/delete logic mostly intact aside from minor formatting and parameter wrapping.
framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs
Execute Dapr-triggered background jobs inside an explicit Unit of Work so handler dispatch and job state updates are transactional.
  • Inject IUnitOfWorkManager into DaprJobExecutionBridge and begin a Unit of Work in ExecuteAsync.
  • Move jobStore.GetByJobNameAsync and jobDispatcher.DispatchAsync inside the Unit of Work, committing on success and rolling back on failure.
  • Retain the existing envelope-unwrapping logic while adjusting formatting and error handling paths.
framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs
Route DbContext domain events primarily through a transaction-bound enqueuer instead of only the ambient event sink.
  • Add LocalEventEnqueuer property to AetherDbContext to let EF local transactions receive domain events directly.
  • Change PublishDomainEventsToSink to collect events once, then prefer LocalEventEnqueuer.EnqueueEvents, falling back to the existing eventSink only when no local enqueuer is set.
  • Ensure domain events are cleared only when they are actually enqueued via one of the two paths.
framework/src/BBT.Aether.Infrastructure/BBT/Aether/Domain/EntityFrameworkCore/AetherDbContext.cs
Tie EF Core local transactions to their DbContext and expose event enqueueing from the transaction side.
  • Update EfCoreTransactionSource.CreateTransactionAsync to create an EfCoreLocalTransaction, assign it to dbContext.LocalEventEnqueuer, and return it.
  • Have EfCoreLocalTransaction implement ILocalTransactionEventEnqueuer, store the DbContext reference, and use it consistently instead of the previous captured variable.
  • Clear dbContext.LocalEventEnqueuer in EfCoreLocalTransaction.DisposeAsync to avoid stale links, and move the ILocalTransactionEventEnqueuer interface into a shared core location.
framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/EntityFrameworkCore/EfCoreTransactionSource.cs
framework/src/BBT.Aether.Core/BBT/Aether/Uow/ILocalTransactionEventEnqueuer.cs
Rely on the surrounding Unit of Work in JobDispatcher instead of creating and managing its own.
  • Remove the inner UnitOfWork creation and commit/rollback from JobDispatcher.DispatchAsync, assuming an ambient Unit of Work exists when it is called.
  • Simplify HandleJobCancellationAsync and HandleJobFailureAsync to update status without directly committing or rolling back a Unit of Work, leaving that to the outer scope.
  • Keep status transitions (Running, Completed, Cancelled, Failed) but adjust methods to no longer take an IUnitOfWork parameter.
framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs
Align composite Unit of Work commit flow with the domain event dispatch strategy by ensuring changes are saved before dispatching.
  • Call SaveChangesAsync at the start of CompositeUnitOfWork.CommitAsync so persistence precedes domain event dispatch.
  • Leave the selection of domain event dispatch strategy (AlwaysUseOutbox vs others) unchanged, just altering the ordering.
  • Make minor whitespace-only changes to UnitOfWorkAttribute for formatting consistency.
framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/CompositeUnitOfWork.cs
framework/src/BBT.Aether.Aspects/BBT/Aether/Aspects/Uow/UnitOfWorkAttribute.cs

Possibly linked issues

  • #Support Local Domain Events: PR wires DbContext domain events into local transactions/UoW via ILocalTransactionEventEnqueuer, implementing the local domain events support.

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@yilmaztayfun yilmaztayfun merged commit 86a67a5 into release-v1.0 Dec 9, 2025
1 of 5 checks passed
@coderabbitai
Copy link

coderabbitai bot commented Dec 9, 2025

Caution

Review failed

The pull request is closed.

Note

.coderabbit.yaml has unrecognized properties

CodeRabbit is using all valid settings from your configuration. Unrecognized properties (listed below) have been ignored and may indicate typos or deprecated fields that can be removed.

⚠️ Parsing warnings (1)
Validation error: Unrecognized key(s) in object: 'review'
⚙️ Configuration instructions
  • Please see the configuration documentation for more information.
  • You can also validate your configuration using the online YAML validator.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Walkthrough

Refactored domain event handling to defer operations until transaction commit. New ILocalTransactionEventEnqueuer interface enables local event enqueuing during SaveChanges. DbContext now routes events via LocalEventEnqueuer with fallback to eventSink. Background jobs and Dapr execution defer scheduling/dispatch to post-commit callbacks within UnitOfWork scopes.

Changes

Cohort / File(s) Summary
Configuration & Formatting
.gitignore, framework/src/BBT.Aether.Aspects/BBT/Aether/Aspects/Uow/UnitOfWorkAttribute.cs
Added ai-docs/ to ignore list and corrected whitespace formatting in UnitOfWorkAttribute
Event Enqueuing Contract
framework/src/BBT.Aether.Core/BBT/Aether/Uow/ILocalTransactionEventEnqueuer.cs
Introduced new interface with EnqueueEvents(IEnumerable<DomainEventEnvelope> events) method for local transaction event collection
Background Job Transactional Integration
framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs, framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs, framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs
Wrapped job execution in UnitOfWork scopes; deferred scheduling to post-commit callbacks; added IUnitOfWorkManager dependency to bridge; removed UoW passing from cancellation/failure handlers
DbContext Event Routing & Priority
framework/src/BBT.Aether.Infrastructure/BBT/Aether/Domain/EntityFrameworkCore/AetherDbContext.cs, framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/EntityFrameworkCore/EfCoreTransactionSource.cs
Added LocalEventEnqueuer property to DbContext with priority-based routing (LocalEventEnqueuer first, eventSink fallback); wired DbContext to EfCoreLocalTransaction; implemented EnqueueEvents in transaction source; added lifecycle cleanup in DisposeAsync
UnitOfWork Commit-Phase Event Dispatch
framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/CompositeUnitOfWork.cs
Added SaveChangesAsync call before domain event dispatch in CommitAsync to ensure all changes persisted prior to outbox/direct publish strategy

Sequence Diagram

sequenceDiagram
    participant App as Application/DaprService
    participant UoW as UnitOfWork
    participant DbCtx as DbContext
    participant LocalTx as EfCoreLocalTransaction
    participant EventMgr as EventEnqueuer
    participant Sink as EventSink

    rect rgb(200, 220, 240)
    Note over App,Sink: New Transactional Event Flow
    end

    App->>UoW: BeginAsync(RequiresNew)
    UoW->>DbCtx: Create/Initialize
    DbCtx->>LocalTx: Set LocalEventEnqueuer
    
    App->>DbCtx: Entity Updates
    App->>DbCtx: SaveChangesAsync()
    
    rect rgb(220, 240, 220)
    Note over DbCtx,LocalTx: SaveChanges Phase - Events Collected
    end
    
    DbCtx->>LocalTx: Collect Domain Events
    DbCtx->>LocalTx: EnqueueEvents(collected)
    LocalTx->>LocalTx: Store in Local Collection
    DbCtx->>EventMgr: PublishDomainEventsToSink()
    
    alt LocalEventEnqueuer Available
        EventMgr->>LocalTx: Enqueue via LocalEventEnqueuer
    else Fallback
        EventMgr->>Sink: Enqueue via eventSink
    end
    
    rect rgb(240, 220, 220)
    Note over UoW,EventMgr: Commit Phase
    end
    
    App->>UoW: CommitAsync()
    UoW->>DbCtx: Commit Transaction
    UoW->>EventMgr: Dispatch Enqueued Events
    EventMgr->>Sink: Publish Events
    UoW->>App: Complete

    rect rgb(240, 240, 220)
    Note over UoW,LocalTx: Post-Commit Cleanup
    end
    
    App->>LocalTx: DisposeAsync()
    LocalTx->>DbCtx: Clear LocalEventEnqueuer Link
    LocalTx->>LocalTx: Dispose Transaction
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Areas requiring extra attention:

  • EfCoreTransactionSource.cs: Verify DbContext.LocalEventEnqueuer lifecycle management, disposal cleanup, and field initialization (_context reference)
  • AetherDbContext.cs: Confirm priority-based routing logic is correct and backward compatibility with eventSink fallback works as intended
  • BackgroundJobService.cs & DaprJobExecutionBridge.cs: Validate transactional scope correctness, race condition prevention between DB write and scheduler, and OnCompleted callback timing
  • Event enqueuing during SaveChanges: Ensure domain events collected during SaveChanges are properly routed and not lost if exception occurs
  • UnitOfWork composition: Verify SaveChangesAsync call in CompositeUnitOfWork.CommitAsync doesn't duplicate or conflict with entity updates

Possibly related PRs

  • 15 support local domain events #16: Both PRs directly modify domain-event and transaction plumbing (AetherDbContext, LocalEventEnqueuer interface, and EF Core transaction sources) to refactor event routing and enqueuing during SaveChanges and commit flows.

Suggested labels

enhancement

Suggested reviewers

  • middt
  • tsimsekburgan

Poem

🐰 Events now flow with transactional grace,
Deferring their launch to the right time and place,
With queues in the scope and priority's art,
Each domain event plays its committed part! 🌱✨

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch hateoas-pagination

📜 Recent review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 90b2c5e and 1c2a655.

📒 Files selected for processing (9)
  • .gitignore (1 hunks)
  • framework/src/BBT.Aether.Aspects/BBT/Aether/Aspects/Uow/UnitOfWorkAttribute.cs (1 hunks)
  • framework/src/BBT.Aether.Core/BBT/Aether/Uow/ILocalTransactionEventEnqueuer.cs (1 hunks)
  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs (8 hunks)
  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs (5 hunks)
  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs (2 hunks)
  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/Domain/EntityFrameworkCore/AetherDbContext.cs (3 hunks)
  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/CompositeUnitOfWork.cs (1 hunks)
  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/EntityFrameworkCore/EfCoreTransactionSource.cs (3 hunks)

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link

Summary of Changes

Hello @yilmaztayfun, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the reliability and consistency of event routing and background job processing within the system. By introducing a direct event enqueuing mechanism from DbContext to local transactions and refining the Unit of Work management for background jobs, it addresses potential race conditions and ensures that domain events and job scheduling are tightly coupled with transaction commits. These changes lead to a more robust and predictable event-driven architecture.

Highlights

  • Direct Event Routing for DbContext: Introduced ILocalTransactionEventEnqueuer to allow DbContext to directly enqueue domain events to its owning local transaction, ensuring events are dispatched to the correct Unit of Work regardless of the ambient context.
  • Robust Background Job Scheduling: Refactored BackgroundJobService to schedule jobs using OnCompleted callbacks of the Unit of Work, ensuring jobs are only scheduled after the transaction is fully committed and persisted to the database, preventing race conditions. It also now uses a RequiresNew Unit of Work scope for job enqueueing.
  • Explicit Transaction Management in Dapr Job Execution: The DaprJobExecutionBridge now explicitly manages its own Unit of Work for job execution, ensuring atomicity and proper error handling during job processing.
  • Decoupled Job Dispatcher: Removed the JobDispatcher's reliance on ambient Unit of Work, making it more independent and reducing potential issues related to UoW scope.
  • Improved Domain Event Persistence: Modified CompositeUnitOfWork to call SaveChangesAsync before dispatching domain events, guaranteeing that all changes are persisted to the database before events are processed.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces several excellent improvements to event routing and transaction handling, especially concerning background jobs. The new ILocalTransactionEventEnqueuer provides a much more reliable way to route domain events to the correct Unit of Work. Using OnCompleted for scheduling background jobs is a smart move to prevent race conditions. The explicit transaction management in DaprJobExecutionBridge and removing ambient UoW reliance in JobDispatcher are also great changes for robustness. My review includes one high-severity suggestion regarding cancellation token handling within the OnCompleted callback to prevent potential inconsistencies in job scheduling.

Comment on lines +105 to +112
uow.OnCompleted(async _ =>
{
await jobScheduler.ScheduleAsync(handlerName, jobName, schedule, payloadBytes, cancellationToken);

logger.LogInformation(
"Successfully scheduled job handler '{HandlerName}' with job name '{JobName}'. Entity ID: {EntityId}",
handlerName, jobName, jobId);
});

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The OnCompleted handler captures the cancellationToken from the EnqueueAsync method's scope. This could lead to an issue where if the original request is cancelled after the transaction has been committed but before the OnCompleted handler finishes, the job scheduling via jobScheduler.ScheduleAsync would also be cancelled. This would leave the system in an inconsistent state where the BackgroundJobInfo entity is persisted in the database, but the job is not actually scheduled to run. Since the OnCompleted handler executes after the commit is successful, its operation should be decoupled from the cancellation of the original request. Using CancellationToken.None will ensure the scheduling proceeds regardless of the original request's status post-commit, making the process more robust.

            uow.OnCompleted(async _ =>
            {
                // Use CancellationToken.None to ensure scheduling is not cancelled
                // by the original request's token after the UoW has committed.
                await jobScheduler.ScheduleAsync(handlerName, jobName, schedule, payloadBytes, CancellationToken.None);
                
                logger.LogInformation(
                    "Successfully scheduled job handler '{HandlerName}' with job name '{JobName}'. Entity ID: {EntityId}",
                    handlerName, jobName, jobId);
            });

Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes - here's some feedback:

  • In BackgroundJobService.EnqueueAsync the OnCompleted callback captures the original CancellationToken; consider using a non-cancellable token (or explicitly documenting the behavior) so that post-commit scheduling isn’t skipped simply because the caller cancelled after persistence succeeded.
  • BackgroundJobService.UpdateAsync still schedules the updated job before persisting the new schedule; to avoid the same race you fixed in EnqueueAsync, consider moving scheduling into an OnCompleted callback after SaveChanges/commit.
  • In JobDispatcher, the success path uses UpdateStatusWithinUowAsync (with a new UoW per status change) while the cancellation/failure paths now update status without any explicit UoW; it may be worth aligning these paths so all status transitions follow a consistent transactional pattern.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In BackgroundJobService.EnqueueAsync the OnCompleted callback captures the original CancellationToken; consider using a non-cancellable token (or explicitly documenting the behavior) so that post-commit scheduling isn’t skipped simply because the caller cancelled after persistence succeeded.
- BackgroundJobService.UpdateAsync still schedules the updated job before persisting the new schedule; to avoid the same race you fixed in EnqueueAsync, consider moving scheduling into an OnCompleted callback after SaveChanges/commit.
- In JobDispatcher, the success path uses UpdateStatusWithinUowAsync (with a new UoW per status change) while the cancellation/failure paths now update status without any explicit UoW; it may be worth aligning these paths so all status transitions follow a consistent transactional pattern.

## Individual Comments

### Comment 1
<location> `framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs:105-107` </location>
<code_context>
+            await uow.SaveChangesAsync(cancellationToken);
+            // Register scheduler to run AFTER commit is fully persisted to DB
+            // This prevents race condition where scheduler triggers before DB write completes
+            uow.OnCompleted(async _ =>
+            {
+                await jobScheduler.ScheduleAsync(handlerName, jobName, schedule, payloadBytes, cancellationToken);
+                
+                logger.LogInformation(
</code_context>

<issue_to_address>
**issue (bug_risk):** Reusing the original cancellation token in OnCompleted can leave the job persisted but never scheduled.

Because the callback runs after the transaction commits, reusing the `EnqueueAsync` `cancellationToken` means a cancellation between commit and callback can prevent scheduling while the job is already persisted. Use `CancellationToken.None` (or a dedicated timeout token) in `OnCompleted`, and reserve the original token for operations up to and including `SaveChangesAsync`.
</issue_to_address>

### Comment 2
<location> `framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/EntityFrameworkCore/EfCoreTransactionSource.cs:61-64` </location>
<code_context>
         IDbContextTransaction? transaction)
         : ILocalTransaction, ITransactionalLocal, ISupportsSaveChanges, IAsyncDisposable, ILocalTransactionEventEnqueuer
     {
+        private readonly AetherDbContext<TDbContext> _context = context;
         private IDbContextTransaction? _transaction = transaction;
         private readonly List<DomainEventEnvelope> _collectedEvents = new();
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Clearing LocalEventEnqueuer in DisposeAsync could race if the context outlives the transaction or is reused.

`EfCoreLocalTransaction` holds `_context` and sets `dbContext.LocalEventEnqueuer = localTx` on creation, then always clears it in `DisposeAsync`. If an `AetherDbContext` instance can later be bound to a different transaction, disposing the old one would null out `LocalEventEnqueuer` for the new transaction. If that reuse is possible, guard the clear (only null if it still equals this instance) or adjust the ownership model to prevent cross-transaction interference.

Suggested implementation:

```csharp
        private readonly AetherDbContext<TDbContext> _context = context;
        private IDbContextTransaction? _transaction = transaction;
        private readonly List<DomainEventEnvelope> _collectedEvents = new();

```

```csharp
            _transaction = isolationLevel.HasValue
                ? await _context.Database.BeginTransactionAsync(isolationLevel.Value, cancellationToken)
                : await _context.Database.BeginTransactionAsync(cancellationToken);
        }

```

```csharp
        public async ValueTask DisposeAsync()
        {
            // Only clear LocalEventEnqueuer if it still belongs to this transaction instance
            if (ReferenceEquals(_context.LocalEventEnqueuer, this))
            {
                _context.LocalEventEnqueuer = null;
            }

            if (_transaction != null)
            {
                await _transaction.DisposeAsync();
                _transaction = null;
            }
        }

```

If `DisposeAsync` in your file has a slightly different structure or additional logic, apply the same pattern at the point where `LocalEventEnqueuer` is cleared:

1. Wrap the assignment to `LocalEventEnqueuer = null` in a `ReferenceEquals(_context.LocalEventEnqueuer, this)` check.
2. Ensure all usages of the constructor parameter `context` inside `EfCoreLocalTransaction` are switched to the `_context` field for consistency, e.g. `context.LocalEventEnqueuer``_context.LocalEventEnqueuer`.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +105 to +107
uow.OnCompleted(async _ =>
{
await jobScheduler.ScheduleAsync(handlerName, jobName, schedule, payloadBytes, cancellationToken);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Reusing the original cancellation token in OnCompleted can leave the job persisted but never scheduled.

Because the callback runs after the transaction commits, reusing the EnqueueAsync cancellationToken means a cancellation between commit and callback can prevent scheduling while the job is already persisted. Use CancellationToken.None (or a dedicated timeout token) in OnCompleted, and reserve the original token for operations up to and including SaveChangesAsync.

Comment on lines +61 to 64
private readonly AetherDbContext<TDbContext> _context = context;
private IDbContextTransaction? _transaction = transaction;
private readonly List<DomainEventEnvelope> _collectedEvents = new();

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Clearing LocalEventEnqueuer in DisposeAsync could race if the context outlives the transaction or is reused.

EfCoreLocalTransaction holds _context and sets dbContext.LocalEventEnqueuer = localTx on creation, then always clears it in DisposeAsync. If an AetherDbContext instance can later be bound to a different transaction, disposing the old one would null out LocalEventEnqueuer for the new transaction. If that reuse is possible, guard the clear (only null if it still equals this instance) or adjust the ownership model to prevent cross-transaction interference.

Suggested implementation:

        private readonly AetherDbContext<TDbContext> _context = context;
        private IDbContextTransaction? _transaction = transaction;
        private readonly List<DomainEventEnvelope> _collectedEvents = new();
            _transaction = isolationLevel.HasValue
                ? await _context.Database.BeginTransactionAsync(isolationLevel.Value, cancellationToken)
                : await _context.Database.BeginTransactionAsync(cancellationToken);
        }
        public async ValueTask DisposeAsync()
        {
            // Only clear LocalEventEnqueuer if it still belongs to this transaction instance
            if (ReferenceEquals(_context.LocalEventEnqueuer, this))
            {
                _context.LocalEventEnqueuer = null;
            }

            if (_transaction != null)
            {
                await _transaction.DisposeAsync();
                _transaction = null;
            }
        }

If DisposeAsync in your file has a slightly different structure or additional logic, apply the same pattern at the point where LocalEventEnqueuer is cleared:

  1. Wrap the assignment to LocalEventEnqueuer = null in a ReferenceEquals(_context.LocalEventEnqueuer, this) check.
  2. Ensure all usages of the constructor parameter context inside EfCoreLocalTransaction are switched to the _context field for consistency, e.g. context.LocalEventEnqueuer_context.LocalEventEnqueuer.

@sonarqubecloud
Copy link

sonarqubecloud bot commented Dec 9, 2025

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants