Skip to content

feat: Add conversation variable persistence layer #4

Open
tomerqodo wants to merge 21 commits intocoderabbit_full_base_feat_add_conversation_variable_persistence_layer__pr4from
coderabbit_full_head_feat_add_conversation_variable_persistence_layer__pr4
Open

feat: Add conversation variable persistence layer #4
tomerqodo wants to merge 21 commits intocoderabbit_full_base_feat_add_conversation_variable_persistence_layer__pr4from
coderabbit_full_head_feat_add_conversation_variable_persistence_layer__pr4

Conversation

@tomerqodo
Copy link

@tomerqodo tomerqodo commented Jan 30, 2026

Benchmark PR from agentic-review-benchmarks#4

Summary by CodeRabbit

  • Refactor

    • Reorganized conversation variable persistence architecture to use a dedicated layer, improving separation of concerns and system efficiency.
  • Tests

    • Added comprehensive test coverage for the new persistence layer and updated existing tests to reflect architectural changes.

✏️ Tip: You can customize this high-level summary in your review settings.

laipz8200 and others added 21 commits January 25, 2026 12:04
… factory to pass the ConversationVariableUpdater factory (the only non-VariablePool dependency), plus a unit test to verify the injection path.

- `api/core/workflow/nodes/variable_assigner/v2/node.py` adds a kw-only `conv_var_updater_factory` dependency (defaulting to `conversation_variable_updater_factory`) and stores it for use in `_run`.
- `api/core/workflow/nodes/node_factory.py` now injects the factory when creating VariableAssigner v2 nodes.
- `api/tests/unit_tests/core/workflow/nodes/variable_assigner/v2/test_variable_assigner_v2.py` adds a test asserting the factory is injected.

Tests not run.

Next steps (optional):
1) `make lint`
2) `make type-check`
3) `uv run --project api --dev dev/pytest/pytest_unit_tests.sh`
…ructor args.

- `api/core/workflow/nodes/node_factory.py` now directly instantiates `VariableAssignerNode` with the injected dependency, and uses a direct call for all other nodes.

No tests run.
Add a new command for GraphEngine to update a group of variables. This command takes a group of variable selectors and new values. When the engine receives the command, it will update the corresponding variable in the variable pool. If it does not exist, it will add it; if it does, it will overwrite it. Both behaviors should be treated the same and do not need to be distinguished.
…be-kanban 0941477f)

Create a new persistence layer for the Graph Engine. This layer receives a ConversationVariableUpdater upon initialization, which is used to persist the received ConversationVariables to the database. It can retrieve the currently processing ConversationId from the engine's variable pool. It captures the successful execution event of each node and determines whether the type of this node is VariableAssigner(v1 and v2). If so, it retrieves the variable name and value that need to be updated from the node's outputs. This layer is only used in the Advanced Chat. It should be placed outside of Core.Workflow package.
…rs/conversation_variable_persist_layer.py` to satisfy SIM118

- chore(lint): run `make lint` (passes; warnings about missing RECORD during venv package uninstall)
- chore(type-check): run `make type-check` (fails: 1275 errors for missing type stubs like `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`)
…tType validation and casting

- test(graph-engine): update VariableUpdate usages to include value_type in command tests
… drop common_helpers usage

- refactor(variable-assigner-v2): inline updated variable payload and drop common_helpers usage

Tests not run.
…n and remove value type validation

- test(graph-engine): update UpdateVariablesCommand tests to pass concrete Variable instances
- fix(graph-engine): align VariableUpdate values with selector before adding to VariablePool

Tests not run.
…e handling for v1/v2 process_data

- refactor(app-layer): read updated variables from process_data in conversation variable persistence layer
- test(app-layer): adapt persistence layer tests to use common_helpers updated-variable payloads

Tests not run.
…fter venv changes)

- chore(type-check): run `make type-check` (fails: 1275 missing type stubs across dependencies)

Details:
- `make lint` fails with `ModuleNotFoundError: No module named 'dotenv_linter.cli'`.
- `make type-check` fails with missing stubs for `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`, etc.
…ableUnion and remove value type validation"

This reverts commit 5ebc87a.
…h SegmentType validation and casting"

This reverts commit 3edd525.
…y out of core.workflow into `api/services/conversation_variable_updater.py`

- refactor(app): update advanced chat app runner and conversation service to import the new updater factory

Tests not run.
…-linter module missing)

- chore(type-check): run `make type-check` (fails: 1275 missing type stubs)

Details:
- `make lint` reports: `No matches for ignored import core.workflow.nodes.variable_assigner.common.impl -> extensions.ext_database` and ends with `ModuleNotFoundError: No module named 'dotenv_linter.cli'`.
- `make type-check` fails with missing type stubs for `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`, etc.
@coderabbitai
Copy link

coderabbitai bot commented Jan 30, 2026

📝 Walkthrough

Walkthrough

The pull request refactors conversation variable persistence from direct node-level handling to a graph engine layer pattern. It removes ConversationVariableUpdater integration from VariableAssigner nodes, introduces ConversationVariablePersistenceLayer as a GraphEngineLayer that listens for VARIABLE_ASSIGNER completion events, and updates the VariablePool interface to use selector-based lookups.

Changes

Cohort / File(s) Summary
Dependency Management
api/.importlinter, api/services/conversation_service.py
Removed ignore rule for variable_assigner path and updated import source for conversation_variable_updater_factory to services module.
Graph Engine Layer
api/core/app/layers/conversation_variable_persist_layer.py, api/core/app/apps/advanced_chat/app_runner.py
New ConversationVariablePersistenceLayer listens for VARIABLE_ASSIGNER node completion events and persists conversation-scoped variables; app_runner attaches this layer to workflow graph engine.
VariablePool Interface Updates
api/core/workflow/runtime/graph_runtime_state_protocol.py, api/core/workflow/runtime/read_only_wrappers.py
Changed VariablePool.get signature from two parameters (node_id, variable_key) to single positional-only selector (Sequence[str]) parameter.
VariableAssigner Node Refactoring
api/core/workflow/nodes/variable_assigner/v1/node.py, api/core/workflow/nodes/variable_assigner/v2/node.py
Removed ConversationVariableUpdater imports and usage; v2 adds explicit init with graph state parameters; conversation variable persistence logic delegated to layer.
Cleanup & Utilities
api/core/workflow/nodes/node_factory.py
Removed extraneous blank line.
Service Layer Updates
api/services/conversation_variable_updater.py
Added ConversationVariableNotFoundError exception; refactored update/flush methods with explicit session handling and return type annotations.
Tests
api/tests/unit_tests/core/app/layers/test_conversation_variable_persist_layer.py, api/tests/unit_tests/core/workflow/nodes/variable_assigner/v1/test_variable_assigner_v1.py, api/tests/unit_tests/core/workflow/nodes/variable_assigner/v2/test_variable_assigner_v2.py, api/tests/unit_tests/core/app/layers/test_pause_state_persist_layer.py
New integration tests for ConversationVariablePersistenceLayer; v1/v2 tests updated to verify node outputs via events instead of updater mocks; MockReadOnlyVariablePool updated to use selector-based get signature.

Sequence Diagram

sequenceDiagram
    participant VA as VariableAssigner<br/>Node
    participant Engine as Graph<br/>Engine
    participant Layer as Conversation<br/>VariablePersist<br/>Layer
    participant Pool as Variable<br/>Pool
    participant Updater as Conversation<br/>Variable<br/>Updater
    participant DB as Database

    VA->>Engine: node execution completes
    activate Engine
    Engine->>Engine: emit NodeRunSucceededEvent
    Engine->>Layer: on_event(NodeRunSucceededEvent)
    deactivate Engine
    
    activate Layer
    Layer->>Layer: validate event type & node type
    Layer->>VA: extract updated_variables from<br/>node run result
    
    loop for each conversation-scoped variable
        Layer->>Pool: get(selector)
        activate Pool
        Pool-->>Layer: Variable
        deactivate Pool
        
        Layer->>Updater: update(conversation_id,<br/>variable)
        activate Updater
        Updater->>DB: update conversation_variable
        Updater-->>Layer: return
        deactivate Updater
        
        Layer->>Updater: flush()
    end
    deactivate Layer
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

🐰 The layer hops in, catches events with care,
No node need toil with persistence's dare,
Graph engine whispers, variables take flight,
To conversation's home, in the layer's light!
A cleaner design, refactored just right!

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Description check ⚠️ Warning Description lacks required sections from template: missing Summary of changes, motivation, context, dependencies, and Screenshots. Only contains a benchmark PR reference. Add comprehensive Summary section with changes overview, motivation, dependencies, and populated checklist items. Remove minimal placeholder-like description.
Docstring Coverage ⚠️ Warning Docstring coverage is 13.89% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed Title is clear, specific, and directly reflects the main objective: introducing a new conversation variable persistence layer.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch coderabbit_full_head_feat_add_conversation_variable_persistence_layer__pr4

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@api/services/conversation_variable_updater.py`:
- Around line 14-24: The update method in conversation_variable_updater.py
currently creates a Session(db.engine) without a context manager which can leak
DB connections on exceptions (including when raising
ConversationVariableNotFoundError); change the session usage in update to use a
context manager (with Session(db.engine) as session:) so the session is always
closed, and move the select/row fetch, the raise, row.data assignment, and
session.commit() inside that with block so cleanup happens even on errors.
🧹 Nitpick comments (3)
api/tests/unit_tests/core/workflow/nodes/variable_assigner/v1/test_variable_assigner_v1.py (1)

106-112: Consider a tiny helper to DRY event extraction.
The same “run → find succeeded event → get updated variables” sequence is repeated in three tests.

[

Details

♻️ Optional helper extraction
+def _get_updated_variables(node):
+    events = list(node.run())
+    succeeded_event = next(event for event in events if isinstance(event, NodeRunSucceededEvent))
+    updated_variables = common_helpers.get_updated_variables(succeeded_event.node_run_result.process_data)
+    assert updated_variables is not None
+    return updated_variables
-    events = list(node.run())
-    succeeded_event = next(event for event in events if isinstance(event, NodeRunSucceededEvent))
-    updated_variables = common_helpers.get_updated_variables(succeeded_event.node_run_result.process_data)
-    assert updated_variables is not None
+    updated_variables = _get_updated_variables(node)
](#)

Also applies to: 202-208, 287-293

api/services/conversation_service.py (1)

20-20: Consider keyword arguments for update() to avoid call-order ambiguity.

♻️ Suggested tweak
-            updater.update(conversation_id, updated_variable)
+            updater.update(conversation_id=conversation_id, variable=updated_variable)

Also applies to: 337-340

api/core/app/layers/conversation_variable_persist_layer.py (1)

38-53: Flush once after the loop to avoid per-variable overhead.

If flush() ever does real I/O, calling it per variable will be expensive. Consider flushing once after the loop (optionally guarded by a flag).

♻️ Suggested refactor
-        for item in updated_variables:
+        did_update = False
+        for item in updated_variables:
             selector = item.selector
             if len(selector) < 2:
                 logger.warning("Conversation variable selector invalid. selector=%s", selector)
                 continue
             if selector[0] != CONVERSATION_VARIABLE_NODE_ID:
                 continue
             variable = self.graph_runtime_state.variable_pool.get(selector)
             if not isinstance(variable, Variable):
                 logger.warning(
                     "Conversation variable not found in variable pool. selector=%s",
                     selector,
                 )
                 continue
             self._conversation_variable_updater.update(conversation_id=conversation_id, variable=variable)
-            self._conversation_variable_updater.flush()
+            did_update = True
+        if did_update:
+            self._conversation_variable_updater.flush()

Comment on lines +14 to +24
def update(self, conversation_id: str, variable: Variable) -> None:
stmt = select(ConversationVariable).where(
ConversationVariable.id == variable.id, ConversationVariable.conversation_id == conversation_id
)
with Session(db.engine) as session:
row = session.scalar(stmt)
if not row:
raise VariableOperatorNodeError("conversation variable not found in the database")
row.data = variable.model_dump_json()
session.commit()

def flush(self):
session = Session(db.engine)
row = session.scalar(stmt)
if not row:
raise ConversationVariableNotFoundError("conversation variable not found in the database")
row.data = variable.model_dump_json()
session.commit()

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, check the actual file content
cat -n api/services/conversation_variable_updater.py

Repository: agentic-review-benchmarks/dify-combined-coderabbit

Length of output: 1275


🏁 Script executed:

# Search for other session usage patterns in the codebase
rg "Session\(" --type py -B 2 -A 5 | head -100

Repository: agentic-review-benchmarks/dify-combined-coderabbit

Length of output: 7752


🏁 Script executed:

# Check if there's a db module/session factory
find . -name "*.py" -type f | xargs rg "Session|session_factory" -l | head -20

Repository: agentic-review-benchmarks/dify-combined-coderabbit

Length of output: 1497


🏁 Script executed:

# Look for context manager usage with Session
rg "with Session" --type py -B 2 -A 5

Repository: agentic-review-benchmarks/dify-combined-coderabbit

Length of output: 50407


Wrap the session in a context manager to prevent connection leaks.

The session is created without cleanup, which can leak connections on exceptions—including the not-found path where ConversationVariableNotFoundError is raised. This project consistently uses context managers throughout (as seen in trigger services, workflow services, etc.). Use with Session(db.engine) as session: to ensure cleanup.

🔧 Suggested fix
-        session = Session(db.engine)
-        row = session.scalar(stmt)
-        if not row:
-            raise ConversationVariableNotFoundError("conversation variable not found in the database")
-        row.data = variable.model_dump_json()
-        session.commit()
+        with Session(db.engine) as session:
+            row = session.scalar(stmt)
+            if not row:
+                raise ConversationVariableNotFoundError("conversation variable not found in the database")
+            row.data = variable.model_dump_json()
+            session.commit()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def update(self, conversation_id: str, variable: Variable) -> None:
stmt = select(ConversationVariable).where(
ConversationVariable.id == variable.id, ConversationVariable.conversation_id == conversation_id
)
with Session(db.engine) as session:
row = session.scalar(stmt)
if not row:
raise VariableOperatorNodeError("conversation variable not found in the database")
row.data = variable.model_dump_json()
session.commit()
def flush(self):
session = Session(db.engine)
row = session.scalar(stmt)
if not row:
raise ConversationVariableNotFoundError("conversation variable not found in the database")
row.data = variable.model_dump_json()
session.commit()
def update(self, conversation_id: str, variable: Variable) -> None:
stmt = select(ConversationVariable).where(
ConversationVariable.id == variable.id, ConversationVariable.conversation_id == conversation_id
)
with Session(db.engine) as session:
row = session.scalar(stmt)
if not row:
raise ConversationVariableNotFoundError("conversation variable not found in the database")
row.data = variable.model_dump_json()
session.commit()
🤖 Prompt for AI Agents
In `@api/services/conversation_variable_updater.py` around lines 14 - 24, The
update method in conversation_variable_updater.py currently creates a
Session(db.engine) without a context manager which can leak DB connections on
exceptions (including when raising ConversationVariableNotFoundError); change
the session usage in update to use a context manager (with Session(db.engine) as
session:) so the session is always closed, and move the select/row fetch, the
raise, row.data assignment, and session.commit() inside that with block so
cleanup happens even on errors.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants