Skip to content

Conversation

@yilmaztayfun
Copy link
Contributor

@yilmaztayfun yilmaztayfun commented Dec 9, 2025

Summary by Sourcery

Improve background job execution reliability and domain event routing by tightening unit-of-work boundaries and linking EF Core transactions directly to their owning DbContexts.

Bug Fixes:

  • Ensure background jobs are only scheduled after their database records are fully committed by registering scheduling in a unit-of-work completion callback.
  • Guarantee job execution runs within an explicit unit of work when triggered via the Dapr job execution bridge, including proper commit/rollback on success or failure.
  • Prevent loss or misrouting of domain events when SaveChanges is called directly on AetherDbContext by routing events through a local transaction enqueuer tied to the owning unit of work.

Enhancements:

  • Introduce a LocalEventEnqueuer hook on AetherDbContext and wire it from EfCoreTransactionSource so domain events are enqueued into the correct local transaction.
  • Simplify job status update logic in JobDispatcher to rely on the surrounding unit of work instead of creating nested units of work.
  • Trigger SaveChanges on all tracked contexts before composite unit of work commit to ensure consistent state before domain event dispatch.

Summary by CodeRabbit

Release Notes

  • Refactor
    • Improved background job execution with enhanced transaction handling to ensure reliable job processing and event dispatching.
    • Refactored domain event routing to prioritize local transaction enqueueing, providing better control over event dispatch timing and order.
    • Streamlined unit-of-work management in job dispatchers for clearer transactional boundaries.

✏️ Tip: You can customize this high-level summary in your review settings.

…jobs

Introduces ILocalTransactionEventEnqueuer to enable direct event routing from DbContext to the owning local transaction, ensuring domain events are dispatched to the correct Unit of Work. Refactors background job scheduling to use OnCompleted for post-commit scheduling, adds explicit transaction management in DaprJobExecutionBridge, and removes ambient UoW reliance in JobDispatcher. Updates .gitignore to exclude ai-docs/.
Improve event routing and transaction handling in UoW and background …
@yilmaztayfun yilmaztayfun requested review from a team as code owners December 9, 2025 11:07
@yilmaztayfun yilmaztayfun requested review from safakcakir and removed request for a team December 9, 2025 11:07
@coderabbitai
Copy link

coderabbitai bot commented Dec 9, 2025

Caution

Review failed

The pull request is closed.

Note

.coderabbit.yaml has unrecognized properties

CodeRabbit is using all valid settings from your configuration. Unrecognized properties (listed below) have been ignored and may indicate typos or deprecated fields that can be removed.

⚠️ Parsing warnings (1)
Validation error: Unrecognized key(s) in object: 'review'
⚙️ Configuration instructions
  • Please see the configuration documentation for more information.
  • You can also validate your configuration using the online YAML validator.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Walkthrough

Introduces a new ILocalTransactionEventEnqueuer interface enabling domain events to be routed directly into local transactions. The AetherDbContext receives a LocalEventEnqueuer property to support this routing, while EfCoreTransactionSource establishes the connection between DbContext and local transactions. Background job processing is refactored to defer scheduling via transaction callbacks, and job dispatch logic removes explicit unit-of-work management.

Changes

Cohort / File(s) Summary
Core Event Enqueuing Abstraction
framework/src/BBT.Aether.Core/BBT/Aether/Uow/ILocalTransactionEventEnqueuer.cs
New public interface defining EnqueueEvents method to enable DbContext to route domain events collected during SaveChanges directly into the owning local transaction.
DbContext & Transaction Infrastructure
framework/src/BBT.Aether.Infrastructure/BBT/Aether/Domain/EntityFrameworkCore/AetherDbContext.cs, framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/EntityFrameworkCore/EfCoreTransactionSource.cs, framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/CompositeUnitOfWork.cs
Adds LocalEventEnqueuer property to AetherDbContext for priority-based event routing. EfCoreTransactionSource establishes bidirectional link between DbContext and local transaction. CompositeUnitOfWork ensures SaveChangesAsync completes before event dispatch logic.
Background Job Processing
framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs, framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs, framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs
BackgroundJobService refactors UoW usage to defer job scheduling via OnCompleted callback. DaprJobExecutionBridge introduces UoW scope around job execution flow. JobDispatcher removes explicit UoW management from dispatch paths, delegating to external callers.
Minor Formatting
.gitignore, framework/src/BBT.Aether.Aspects/BBT/Aether/Aspects/Uow/UnitOfWorkAttribute.cs
Adds ai-docs/ ignore rule; adjusts whitespace in UnitOfWorkAttribute.

Sequence Diagram

sequenceDiagram
    actor Caller
    participant UoW as Unit of Work
    participant DbCtx as DbContext
    participant Txn as EfCoreLocalTransaction
    participant EventSink as Domain Event Sink

    Caller->>UoW: BeginAsync
    UoW->>Txn: CreateTransaction
    Txn->>DbCtx: Set LocalEventEnqueuer
    
    Caller->>DbCtx: SaveChanges (entity operations)
    DbCtx->>DbCtx: Collect domain events
    DbCtx->>Txn: EnqueueEvents via LocalEventEnqueuer
    Txn->>Txn: Store events in local transaction
    
    Caller->>UoW: SaveChangesAsync
    DbCtx->>DbCtx: Publish domain events
    
    rect rgb(180, 220, 180)
    Note over Caller,EventSink: Event Dispatch Strategy
    alt LocalEventEnqueuer set
        DbCtx->>Txn: Retrieve enqueued events
        Txn->>EventSink: Dispatch events
    else Ambient EventSink
        DbCtx->>EventSink: Dispatch events
    end
    end
    
    Caller->>UoW: CommitAsync
    UoW->>Txn: Commit
    Txn->>Txn: Clear DbContext link
    
    Caller->>Caller: Continue with committed state
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

  • Key areas requiring attention:
    • Transaction semantics and event ordering: ensure events are properly enqueued before SaveChanges completes and that dispatch order respects the new priority logic
    • Background job scheduling deferred execution: verify OnCompleted callbacks execute at the correct transaction boundary
    • Job dispatcher flow: confirm removal of explicit UoW management doesn't break error handling or status tracking
    • DbContext-to-transaction bidirectional link: verify proper cleanup on disposal and no reference leaks

Possibly related PRs

  • PR #16: Implements transaction-aware event storage and dispatch alongside DbContext event orchestration; complements this PR's introduction of ILocalTransactionEventEnqueuer and local transaction routing mechanism.

Suggested labels

enhancement

Suggested reviewers

  • tsimsekburgan
  • ikarakayali

Poem

🐰 Domain events hop through transactions now,
Enqueued in local scope, all in a row,
No more ambiguous paths—the enqueuer's the way,
Clear routing and callbacks bring order to the fray! ✨

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch release-v1.0

📜 Recent review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 428357e and 86a67a5.

📒 Files selected for processing (9)
  • .gitignore (1 hunks)
  • framework/src/BBT.Aether.Aspects/BBT/Aether/Aspects/Uow/UnitOfWorkAttribute.cs (1 hunks)
  • framework/src/BBT.Aether.Core/BBT/Aether/Uow/ILocalTransactionEventEnqueuer.cs (1 hunks)
  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs (8 hunks)
  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs (5 hunks)
  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs (2 hunks)
  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/Domain/EntityFrameworkCore/AetherDbContext.cs (3 hunks)
  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/CompositeUnitOfWork.cs (1 hunks)
  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/EntityFrameworkCore/EfCoreTransactionSource.cs (3 hunks)

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@sourcery-ai
Copy link

sourcery-ai bot commented Dec 9, 2025

Reviewer's Guide

Aligns background job scheduling/execution and EF Core unit-of-work handling so jobs are only scheduled after durable commit, job execution runs inside a unit of work, and domain events are reliably routed to the correct local transaction via a new DbContext-to-local-transaction linkage.

Sequence diagram for background job enqueue and post-commit scheduling

sequenceDiagram
    actor Caller
    participant BackgroundJobService
    participant UowManager
    participant Uow
    participant JobStore
    participant JobScheduler
    participant EventSerializer

    Caller->>BackgroundJobService: EnqueueAsync(handlerName, jobName, payload, schedule, metadata)
    BackgroundJobService->>EventSerializer: Serialize(CloudEventEnvelope)
    EventSerializer-->>BackgroundJobService: payloadBytes

    BackgroundJobService->>UowManager: BeginAsync(options RequiresNew)
    UowManager-->>BackgroundJobService: Uow

    BackgroundJobService->>JobStore: SaveAsync(jobInfo)
    JobStore-->>BackgroundJobService: (saved)

    BackgroundJobService->>Uow: SaveChangesAsync()
    Uow-->>BackgroundJobService: (saved)

    BackgroundJobService->>Uow: OnCompleted(handler)
    Uow-->>BackgroundJobService: (registered)

    BackgroundJobService->>Uow: CommitAsync()
    Uow-->>BackgroundJobService: (committed)

    Uow-->>JobScheduler: ScheduleAsync(handlerName, jobName, schedule, payloadBytes)
    JobScheduler-->>Uow: (scheduled)

    BackgroundJobService-->>Caller: jobId

    alt failure during enqueue
        BackgroundJobService->>Uow: RollbackAsync()
        Uow-->>BackgroundJobService: (rolled back)
        BackgroundJobService-->>Caller: exception
    end
Loading

Sequence diagram for Dapr job execution within a unit of work

sequenceDiagram
    participant DaprScheduler
    participant DaprJobExecutionBridge
    participant UnitOfWorkManager
    participant Uow
    participant EventSerializer
    participant JobStore
    participant JobDispatcher
    participant Logger

    DaprScheduler->>DaprJobExecutionBridge: ExecuteAsync(jobName, payload)

    DaprJobExecutionBridge->>EventSerializer: Deserialize(CloudEventEnvelope)
    EventSerializer-->>DaprJobExecutionBridge: envelope or null

    alt envelope
        DaprJobExecutionBridge->>EventSerializer: Serialize(envelope.Data)
        EventSerializer-->>DaprJobExecutionBridge: argsPayload
    else raw payload
        DaprJobExecutionBridge-->>DaprJobExecutionBridge: argsPayload = payload
    end

    DaprJobExecutionBridge->>UnitOfWorkManager: BeginAsync()
    UnitOfWorkManager-->>DaprJobExecutionBridge: Uow

    DaprJobExecutionBridge->>JobStore: GetByJobNameAsync(jobName)
    JobStore-->>DaprJobExecutionBridge: jobInfo or null

    alt jobInfo is null
        DaprJobExecutionBridge->>Logger: LogError(job not found)
        DaprJobExecutionBridge->>Uow: RollbackAsync()
        Uow-->>DaprJobExecutionBridge: (rolled back)
        DaprJobExecutionBridge-->>DaprScheduler: exception
    else jobInfo found
        DaprJobExecutionBridge->>JobDispatcher: DispatchAsync(jobInfo.Id, jobInfo.HandlerName, argsPayload)
        JobDispatcher-->>DaprJobExecutionBridge: (handled)

        DaprJobExecutionBridge->>Uow: CommitAsync()
        Uow-->>DaprJobExecutionBridge: (committed)
        DaprJobExecutionBridge-->>DaprScheduler: (completed)
    end

    alt any exception
        DaprJobExecutionBridge->>Logger: LogError(ex)
        DaprJobExecutionBridge->>Uow: RollbackAsync()
        Uow-->>DaprJobExecutionBridge: (rolled back)
        DaprJobExecutionBridge-->>DaprScheduler: exception
    end
Loading

Sequence diagram for domain event routing from DbContext to local transaction

sequenceDiagram
    participant AppCode
    participant EfCoreTransactionSource
    participant AetherDbContext
    participant EfCoreLocalTransaction
    participant CompositeUnitOfWork
    participant DomainEventSink

    AppCode->>CompositeUnitOfWork: Begin()
    CompositeUnitOfWork->>EfCoreTransactionSource: CreateTransactionAsync(context)
    EfCoreTransactionSource->>AetherDbContext: (uses context)
    EfCoreTransactionSource-->>CompositeUnitOfWork: EfCoreLocalTransaction
    EfCoreTransactionSource->>AetherDbContext: set LocalEventEnqueuer(EfCoreLocalTransaction)

    AppCode->>AetherDbContext: Modify entities
    AppCode->>AetherDbContext: SaveChanges()

    AetherDbContext-->>AetherDbContext: CollectDomainEvents()
    alt LocalEventEnqueuer is set
        AetherDbContext->>EfCoreLocalTransaction: EnqueueEvents(domainEvents)
        AetherDbContext-->>AetherDbContext: ClearDomainEvents()
    else no LocalEventEnqueuer
        alt eventSink is set
            AetherDbContext->>DomainEventSink: EnqueueDomainEvents(domainEvents)
            AetherDbContext-->>AetherDbContext: ClearDomainEvents()
        else no event sink
            AetherDbContext-->>AetherDbContext: events remain for manual handling
        end
    end

    AppCode->>CompositeUnitOfWork: CommitAsync()
    CompositeUnitOfWork->>CompositeUnitOfWork: SaveChangesAsync() and dispatch collected events

    CompositeUnitOfWork->>EfCoreLocalTransaction: DisposeAsync()
    EfCoreLocalTransaction->>AetherDbContext: set LocalEventEnqueuer(null)
    EfCoreLocalTransaction-->>CompositeUnitOfWork: (disposed)
Loading

Updated class diagram for DbContext, local transaction, and background job components

classDiagram
    class BackgroundJobService {
        +EnqueueAsync~TPayload~(string handlerName, string jobName, TPayload payload, string schedule, IDictionary~string,object~ metadata, CancellationToken cancellationToken) Task~Guid~
        +UpdateAsync(Guid id, string newSchedule, CancellationToken cancellationToken) Task
        +DeleteAsync(Guid id, CancellationToken cancellationToken) Task~bool~
    }

    class DaprJobExecutionBridge {
        +ExecuteAsync(string jobName, ReadOnlyMemory~byte~ payload, CancellationToken cancellationToken) Task
        -ReadOnlyMemory~byte~? TryExtractPayload(ReadOnlyMemory~byte~ payload)
    }

    class JobDispatcher {
        +DispatchAsync(Guid jobId, string handlerName, ReadOnlyMemory~byte~ payload, CancellationToken cancellationToken) Task
        -UpdateStatusWithinUowAsync(Guid jobId, BackgroundJobStatus status, string message, CancellationToken cancellationToken) Task
        -HandleJobCancellationAsync(Guid jobId, CancellationToken cancellationToken) Task
        -HandleJobFailureAsync(Guid jobId, Exception exception, CancellationToken cancellationToken) Task
    }

    class AetherDbContext~TDbContext~ {
        +ILocalTransactionEventEnqueuer LocalEventEnqueuer
        +CollectDomainEvents() List~DomainEventEnvelope~
        +ClearDomainEvents() void
        -PublishDomainEventsToSink() void
        +SaveChanges() int
        +SaveChangesAsync(CancellationToken cancellationToken) Task~int~
    }

    class EfCoreTransactionSource~TDbContext~ {
        +CreateTransactionAsync(AetherDbContext~TDbContext~ dbContext, IsolationLevel? isolationLevel, CancellationToken cancellationToken) Task~ILocalTransaction~
    }

    class EfCoreLocalTransaction {
        +EnsureTransactionAsync(IsolationLevel? isolationLevel, CancellationToken cancellationToken) Task
        +CommitAsync(CancellationToken cancellationToken) Task
        +RollbackAsync(CancellationToken cancellationToken) Task
        +ClearCollectedEvents() void
        +SaveChangesAsync(CancellationToken cancellationToken) Task
        +DisposeAsync() ValueTask
        +EnqueueEvents(IEnumerable~DomainEventEnvelope~ events) void
        -AetherDbContext~TDbContext~ _context
        -IDbContextTransaction _transaction
        -List~DomainEventEnvelope~ _collectedEvents
    }

    class CompositeUnitOfWork {
        +CommitAsync(CancellationToken cancellationToken) Task
        +SaveChangesAsync(CancellationToken cancellationToken) Task
    }

    class ILocalTransactionEventEnqueuer {
        <<interface>>
        +EnqueueEvents(IEnumerable~DomainEventEnvelope~ events) void
    }

    class IUnitOfWorkManager {
        <<interface>>
        +BeginAsync(UnitOfWorkOptions options, CancellationToken cancellationToken) Task~IUnitOfWork~
        +BeginAsync(CancellationToken cancellationToken) Task~IUnitOfWork~
    }

    class IUnitOfWork {
        <<interface>>
        +SaveChangesAsync(CancellationToken cancellationToken) Task
        +CommitAsync(CancellationToken cancellationToken) Task
        +RollbackAsync(CancellationToken cancellationToken) Task
        +OnCompleted(Func~IUnitOfWork,Task~ handler) void
    }

    class IJobStore {
        <<interface>>
        +SaveAsync(BackgroundJobInfo jobInfo, CancellationToken cancellationToken) Task
        +UpdateStatusAsync(Guid id, BackgroundJobStatus status, DateTime timestamp, string reason, CancellationToken cancellationToken) Task
        +GetByJobNameAsync(string jobName, CancellationToken cancellationToken) Task~BackgroundJobInfo~
    }

    class IJobScheduler {
        <<interface>>
        +ScheduleAsync(string handlerName, string jobName, string schedule, ReadOnlyMemory~byte~ payload, CancellationToken cancellationToken) Task
        +DeleteAsync(string handlerName, string jobName, CancellationToken cancellationToken) Task
    }

    class IEventSerializer {
        <<interface>>
        +Serialize(object value) byte[]
        +Deserialize~T~(ReadOnlyMemory~byte~ payload) T
    }

    BackgroundJobService --> IUnitOfWorkManager : uses
    BackgroundJobService --> IJobStore : uses
    BackgroundJobService --> IJobScheduler : uses
    BackgroundJobService --> IEventSerializer : uses
    BackgroundJobService --> IUnitOfWork : manages

    DaprJobExecutionBridge --> IUnitOfWorkManager : uses
    DaprJobExecutionBridge --> IJobStore : uses
    DaprJobExecutionBridge --> JobDispatcher : uses
    DaprJobExecutionBridge --> IEventSerializer : uses

    JobDispatcher --> IJobStore : uses
    JobDispatcher --> IUnitOfWorkManager : uses

    EfCoreTransactionSource~TDbContext~ --> AetherDbContext~TDbContext~ : creates transaction for
    EfCoreTransactionSource~TDbContext~ --> EfCoreLocalTransaction : returns

    EfCoreLocalTransaction ..|> ILocalTransactionEventEnqueuer : implements

    AetherDbContext~TDbContext~ --> ILocalTransactionEventEnqueuer : LocalEventEnqueuer

    CompositeUnitOfWork --> EfCoreLocalTransaction : coordinates

    IUnitOfWorkManager --> IUnitOfWork : creates

    IJobStore --> BackgroundJobInfo : persists
    JobDispatcher --> BackgroundJobInfo : reads

    IJobScheduler --> BackgroundJobInfo : schedules

    IUnitOfWork <|.. EfCoreLocalTransaction : participates via owning UoW
Loading

File-Level Changes

Change Details Files
Make background job enqueueing transactionally safe by scheduling only after the persistence transaction has fully committed.
  • Wrap job creation in a new-scoped unit of work instead of using the ambient one.
  • Explicitly call SaveChangesAsync on the unit of work before commit when saving the job entity.
  • Use the unit of work OnCompleted callback to schedule the job only after the commit completes, removing direct pre-commit scheduling.
  • Improve logging around enqueueing and scheduling operations.
  • Keep update/delete flows mostly intact but adjust some scheduler and store calls formatting/logging.
framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs
Ensure Dapr job execution participates in a unit of work and rolls back on failure.
  • Inject IUnitOfWorkManager into the Dapr job execution bridge.
  • Begin a unit of work around job metadata loading and handler dispatch.
  • Commit the unit of work on successful dispatch, rollback on any exception.
  • Keep envelope/schema extraction logic but wrap dispatcher interactions in the transactional scope.
  • Improve error logging for missing jobs and dispatch failures.
framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs
Introduce a direct linkage from EF Core DbContext to its owning local transaction to route domain events correctly.
  • Add a LocalEventEnqueuer property to AetherDbContext for transaction-local event routing.
  • Change PublishDomainEventsToSink to first collect events and, if any, send them to LocalEventEnqueuer when available, falling back to the existing eventSink for backward compatibility.
  • Create a new public ILocalTransactionEventEnqueuer interface in the core UoW namespace for DbContext-to-transaction communication.
framework/src/BBT.Aether.Infrastructure/BBT/Aether/Domain/EntityFrameworkCore/AetherDbContext.cs
framework/src/BBT.Aether.Core/BBT/Aether/Uow/ILocalTransactionEventEnqueuer.cs
Wire EF Core local transactions to DbContext for domain event collection and clean up on disposal.
  • When creating an EfCoreLocalTransaction, set the associated DbContext LocalEventEnqueuer to the transaction instance.
  • Store the DbContext in a private field within EfCoreLocalTransaction and use it instead of the constructor parameter directly.
  • On transaction disposal, clear the DbContext LocalEventEnqueuer to avoid stale references.
  • Adjust transaction creation/begin logic to use the stored DbContext field.
  • Move the ILocalTransactionEventEnqueuer interface to the shared core namespace and remove the old internal declaration.
framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/EntityFrameworkCore/EfCoreTransactionSource.cs
Rely on an existing surrounding unit of work for job execution status changes instead of creating an inner one.
  • Remove the BeginAsync/CommitAsync/RollbackAsync unit of work handling from JobDispatcher.ExecuteAsync.
  • Update cancellation and failure handlers to operate without an explicit IUnitOfWork parameter, assuming an ambient/surrounding UoW will manage persistence.
  • Retain status update calls but simplify control flow and logging around failure/cancellation.
framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs
Ensure CompositeUnitOfWork persists state before dispatching domain events.
  • Call SaveChangesAsync at the start of CompositeUnitOfWork.CommitAsync before resolving the domain event dispatch strategy and dispatching events, ensuring all state is flushed to the database prior to event handling.
framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/CompositeUnitOfWork.cs

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@yilmaztayfun yilmaztayfun merged commit aadff7b into master Dec 9, 2025
4 of 6 checks passed
@gemini-code-assist
Copy link

Summary of Changes

Hello @yilmaztayfun, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the reliability and correctness of domain event dispatching and background job processing within the framework. By establishing a direct link between DbContext and its local transaction for event enqueuing, it ensures events are always routed to the correct Unit of Work. Furthermore, background job scheduling has been made more robust by guaranteeing persistence before scheduling, and job execution is now properly isolated within its own transactional boundaries, leading to a more stable and predictable system.

Highlights

  • Direct Event Routing for DbContext: Introduced ILocalTransactionEventEnqueuer to allow DbContext instances to directly enqueue domain events to their owning local transaction, bypassing the ambient Unit of Work context for more reliable event dispatching.
  • Robust Background Job Scheduling: Modified BackgroundJobService to ensure jobs are scheduled only after their persistence to the database is fully committed, preventing race conditions where a scheduler might trigger a job before its data is saved.
  • Transactional Integrity for Job Execution: The DaprJobExecutionBridge now wraps job execution within its own dedicated Unit of Work, ensuring that the processing of background jobs maintains transactional integrity.
  • Simplified Job Dispatcher: Refactored JobDispatcher to remove explicit Unit of Work management, allowing it to rely on the transactional context provided by its callers, such as the DaprJobExecutionBridge.
  • Git Ignore Update: Added ai-docs/ to the .gitignore file to exclude AI-generated documentation from version control.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new mechanism for handling domain events within the Unit of Work (UoW) by establishing a direct link between DbContext and its local transaction. A new interface, ILocalTransactionEventEnqueuer, was added to facilitate this, allowing AetherDbContext to directly enqueue events to the correct transaction via its LocalEventEnqueuer property, which is set by EfCoreTransactionSource. The CompositeUnitOfWork.CommitAsync method was updated to include SaveChangesAsync to ensure changes are persisted before event dispatch. The BackgroundJobService.EnqueueAsync method was refactored to use a RequiresNew UoW scope and to schedule jobs within uow.OnCompleted to prevent race conditions, ensuring jobs are scheduled only after database persistence; a review comment noted a redundant SaveChangesAsync call in this method due to the CompositeUnitOfWork change. Additionally, the DaprJobExecutionBridge now wraps job execution in its own UoW, leading to the removal of internal UoW management from JobDispatcher. A review comment highlighted that job status updates for cancellation and failure in JobDispatcher would be rolled back, recommending these updates be performed in a new, separate transaction using UpdateStatusWithinUowAsync.

Comment on lines +132 to 143
private async Task HandleJobCancellationAsync(Guid jobId, CancellationToken cancellationToken)
{
try
{
await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Cancelled,
clock.UtcNow, "Job was cancelled", cancellationToken);
await uow.CommitAsync(cancellationToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to update job status to Cancelled");
await uow.RollbackAsync(cancellationToken);
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The status update to Cancelled in this method will be rolled back. The DispatchAsync method calls this from within a catch block, but the DaprJobExecutionBridge (the caller of DispatchAsync) will roll back the entire Unit of Work upon catching the exception. This means the status update is lost, and the job might remain in a Running state in the database.

To fix this, the status update should be performed in a new, separate transaction. You can achieve this by using the existing helper method UpdateStatusWithinUowAsync.

    private async Task HandleJobCancellationAsync(Guid jobId, CancellationToken cancellationToken)
    {
        await UpdateStatusWithinUowAsync(jobId, BackgroundJobStatus.Cancelled, "Job was cancelled", cancellationToken);
    }

await jobStore.SaveAsync(jobInfo, cancellationToken);

// Commit transaction
await uow.SaveChangesAsync(cancellationToken);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This call to uow.SaveChangesAsync() is redundant. The CompositeUnitOfWork.CommitAsync() method, which is called later, now includes a call to SaveChangesAsync() at the beginning of its execution. Removing this line will avoid the duplicate call without changing the behavior.

Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes and found some issues that need to be addressed.

  • In BackgroundJobService.EnqueueAsync, the OnCompleted callback reuses the original cancellationToken, which may be canceled after the transaction commits and prevent the post-commit scheduling; consider using a separate token (or CancellationToken.None) for the scheduler call so that a successfully committed job cannot be silently dropped.
  • With the removal of the inner unit of work in JobDispatcher.ExecuteWithinUowAsync, all status updates now rely on an ambient UoW; it would be safer to either assert/guard that a current UoW exists when this method is used or fall back to creating one to avoid non-transactional status changes when called without a surrounding UoW.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `BackgroundJobService.EnqueueAsync`, the `OnCompleted` callback reuses the original `cancellationToken`, which may be canceled after the transaction commits and prevent the post-commit scheduling; consider using a separate token (or `CancellationToken.None`) for the scheduler call so that a successfully committed job cannot be silently dropped.
- With the removal of the inner unit of work in `JobDispatcher.ExecuteWithinUowAsync`, all status updates now rely on an ambient UoW; it would be safer to either assert/guard that a current UoW exists when this method is used or fall back to creating one to avoid non-transactional status changes when called without a surrounding UoW.

## Individual Comments

### Comment 1
<location> `framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs:105-107` </location>
<code_context>
+            await uow.SaveChangesAsync(cancellationToken);
+            // Register scheduler to run AFTER commit is fully persisted to DB
+            // This prevents race condition where scheduler triggers before DB write completes
+            uow.OnCompleted(async _ =>
+            {
+                await jobScheduler.ScheduleAsync(handlerName, jobName, schedule, payloadBytes, cancellationToken);
+                
+                logger.LogInformation(
</code_context>

<issue_to_address>
**issue (bug_risk):** Reconsider using the original CancellationToken in the OnCompleted callback.

If the original `cancellationToken` is canceled by the time `OnCompleted` runs (e.g., HTTP request aborted after the commit), `ScheduleAsync` and logging will be skipped even though the job was successfully persisted. That can leave jobs stored but never scheduled. Use `CancellationToken.None` (or a separate, clearly scoped token) for the post-commit scheduling/logging work, and reserve the original token for transactional operations only.
</issue_to_address>

### Comment 2
<location> `framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs:53-62` </location>
<code_context>
-            var jobInfo = await jobStore.GetByJobNameAsync(jobName, cancellationToken);
-            if (jobInfo == null)
+
+            await using var uow = await unitOfWorkManager.BeginAsync(cancellationToken: cancellationToken);
+            try
+            {
+                var jobInfo = await jobStore.GetByJobNameAsync(jobName, cancellationToken);
+                if (jobInfo == null)
+                {
+                    logger.LogError("Job with name '{JobName}' not found in store", jobName);
+                    throw new InvalidOperationException($"Job with name '{jobName}' not found in store.");
+                }
+
+                // Dispatch to handler using the handler name from job entity
+                await jobDispatcher.DispatchAsync(jobInfo.Id, jobInfo.HandlerName, argsPayload, cancellationToken);
+
+                await uow.CommitAsync(cancellationToken);
+            }
+            catch (Exception)
</code_context>

<issue_to_address>
**issue (bug_risk):** Status updates on job failure/cancellation are likely being rolled back with the Unit of Work.

Because the bridge now owns the outer UoW, the failure/cancellation path ends up like this:

- `JobDispatcher.ExecuteAsync` sets status to `Running` in its own `RequiresNew` UoW.
- On handler failure/cancellation it calls `HandleJobFailureAsync`/`HandleJobCancellationAsync`, which now rely on the bridge’s ambient UoW.
- The bridge then catches the exception and rolls back that UoW.

So the `Failed`/`Cancelled` updates are rolled back and the job remains `Running` (or older), unlike the previous implementation where those updates were committed in a local UoW.

I’d suggest either keeping failure/cancellation status updates in their own `RequiresNew` UoW, or adjusting the bridge so job metadata commits even when the dispatcher throws (e.g., separating handler side effects from job status persistence).
</issue_to_address>

### Comment 3
<location> `framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/EntityFrameworkCore/EfCoreTransactionSource.cs:61-64` </location>
<code_context>
         IDbContextTransaction? transaction)
         : ILocalTransaction, ITransactionalLocal, ISupportsSaveChanges, IAsyncDisposable, ILocalTransactionEventEnqueuer
     {
+        private readonly AetherDbContext<TDbContext> _context = context;
         private IDbContextTransaction? _transaction = transaction;
         private readonly List<DomainEventEnvelope> _collectedEvents = new();
</code_context>

<issue_to_address>
**issue (bug_risk):** Clearing LocalEventEnqueuer in DisposeAsync can race with other local transactions on the same DbContext.

`EfCoreLocalTransaction` assigns `dbContext.LocalEventEnqueuer = localTx` on creation and clears it in `DisposeAsync`. With multiple local transactions on the same `DbContext` (nested/overlapping UoWs), a later-disposing older transaction can overwrite/null the `LocalEventEnqueuer` set by a newer one, breaking event routing for the active transaction.

To avoid this, either:
- only clear if it still points to this instance (e.g. `if (ReferenceEquals(_context.LocalEventEnqueuer, this)) ...`), or
- enforce and validate a single-local-transaction-per-context invariant.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +105 to +107
uow.OnCompleted(async _ =>
{
await jobScheduler.ScheduleAsync(handlerName, jobName, schedule, payloadBytes, cancellationToken);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Reconsider using the original CancellationToken in the OnCompleted callback.

If the original cancellationToken is canceled by the time OnCompleted runs (e.g., HTTP request aborted after the commit), ScheduleAsync and logging will be skipped even though the job was successfully persisted. That can leave jobs stored but never scheduled. Use CancellationToken.None (or a separate, clearly scoped token) for the post-commit scheduling/logging work, and reserve the original token for transactional operations only.

Comment on lines +53 to +62
await using var uow = await unitOfWorkManager.BeginAsync(cancellationToken: cancellationToken);
try
{
var jobInfo = await jobStore.GetByJobNameAsync(jobName, cancellationToken);
if (jobInfo == null)
{
logger.LogError("Job with name '{JobName}' not found in store", jobName);
throw new InvalidOperationException($"Job with name '{jobName}' not found in store.");
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Status updates on job failure/cancellation are likely being rolled back with the Unit of Work.

Because the bridge now owns the outer UoW, the failure/cancellation path ends up like this:

  • JobDispatcher.ExecuteAsync sets status to Running in its own RequiresNew UoW.
  • On handler failure/cancellation it calls HandleJobFailureAsync/HandleJobCancellationAsync, which now rely on the bridge’s ambient UoW.
  • The bridge then catches the exception and rolls back that UoW.

So the Failed/Cancelled updates are rolled back and the job remains Running (or older), unlike the previous implementation where those updates were committed in a local UoW.

I’d suggest either keeping failure/cancellation status updates in their own RequiresNew UoW, or adjusting the bridge so job metadata commits even when the dispatcher throws (e.g., separating handler side effects from job status persistence).

Comment on lines +61 to 64
private readonly AetherDbContext<TDbContext> _context = context;
private IDbContextTransaction? _transaction = transaction;
private readonly List<DomainEventEnvelope> _collectedEvents = new();

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Clearing LocalEventEnqueuer in DisposeAsync can race with other local transactions on the same DbContext.

EfCoreLocalTransaction assigns dbContext.LocalEventEnqueuer = localTx on creation and clears it in DisposeAsync. With multiple local transactions on the same DbContext (nested/overlapping UoWs), a later-disposing older transaction can overwrite/null the LocalEventEnqueuer set by a newer one, breaking event routing for the active transaction.

To avoid this, either:

  • only clear if it still points to this instance (e.g. if (ReferenceEquals(_context.LocalEventEnqueuer, this)) ...), or
  • enforce and validate a single-local-transaction-per-context invariant.

@sonarqubecloud
Copy link

sonarqubecloud bot commented Dec 9, 2025

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants