feat: Add conversation variable persistence layer #4
Conversation
… factory to pass the ConversationVariableUpdater factory (the only non-VariablePool dependency), plus a unit test to verify the injection path. - `api/core/workflow/nodes/variable_assigner/v2/node.py` adds a kw-only `conv_var_updater_factory` dependency (defaulting to `conversation_variable_updater_factory`) and stores it for use in `_run`. - `api/core/workflow/nodes/node_factory.py` now injects the factory when creating VariableAssigner v2 nodes. - `api/tests/unit_tests/core/workflow/nodes/variable_assigner/v2/test_variable_assigner_v2.py` adds a test asserting the factory is injected. Tests not run. Next steps (optional): 1) `make lint` 2) `make type-check` 3) `uv run --project api --dev dev/pytest/pytest_unit_tests.sh`
…ructor args. - `api/core/workflow/nodes/node_factory.py` now directly instantiates `VariableAssignerNode` with the injected dependency, and uses a direct call for all other nodes. No tests run.
Add a new command for GraphEngine to update a group of variables. This command takes a group of variable selectors and new values. When the engine receives the command, it will update the corresponding variable in the variable pool. If it does not exist, it will add it; if it does, it will overwrite it. Both behaviors should be treated the same and do not need to be distinguished.
…be-kanban 0941477f) Create a new persistence layer for the Graph Engine. This layer receives a ConversationVariableUpdater upon initialization, which is used to persist the received ConversationVariables to the database. It can retrieve the currently processing ConversationId from the engine's variable pool. It captures the successful execution event of each node and determines whether the type of this node is VariableAssigner(v1 and v2). If so, it retrieves the variable name and value that need to be updated from the node's outputs. This layer is only used in the Advanced Chat. It should be placed outside of Core.Workflow package.
…rs/conversation_variable_persist_layer.py` to satisfy SIM118 - chore(lint): run `make lint` (passes; warnings about missing RECORD during venv package uninstall) - chore(type-check): run `make type-check` (fails: 1275 errors for missing type stubs like `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`)
…tType validation and casting - test(graph-engine): update VariableUpdate usages to include value_type in command tests
… drop common_helpers usage - refactor(variable-assigner-v2): inline updated variable payload and drop common_helpers usage Tests not run.
…n and remove value type validation - test(graph-engine): update UpdateVariablesCommand tests to pass concrete Variable instances - fix(graph-engine): align VariableUpdate values with selector before adding to VariablePool Tests not run.
…e handling for v1/v2 process_data - refactor(app-layer): read updated variables from process_data in conversation variable persistence layer - test(app-layer): adapt persistence layer tests to use common_helpers updated-variable payloads Tests not run.
…nce reads from process_data
…fter venv changes) - chore(type-check): run `make type-check` (fails: 1275 missing type stubs across dependencies) Details: - `make lint` fails with `ModuleNotFoundError: No module named 'dotenv_linter.cli'`. - `make type-check` fails with missing stubs for `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`, etc.
…ableUnion and remove value type validation" This reverts commit 5ebc87a.
…h SegmentType validation and casting" This reverts commit 3edd525.
This reverts commit 67007f6.
…y out of core.workflow into `api/services/conversation_variable_updater.py` - refactor(app): update advanced chat app runner and conversation service to import the new updater factory Tests not run.
…-linter module missing) - chore(type-check): run `make type-check` (fails: 1275 missing type stubs) Details: - `make lint` reports: `No matches for ignored import core.workflow.nodes.variable_assigner.common.impl -> extensions.ext_database` and ends with `ModuleNotFoundError: No module named 'dotenv_linter.cli'`. - `make type-check` fails with missing type stubs for `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`, etc.
…impl import in `api/.importlinter`
📝 WalkthroughWalkthroughThe pull request refactors conversation variable persistence from direct node-level handling to a graph engine layer pattern. It removes ConversationVariableUpdater integration from VariableAssigner nodes, introduces ConversationVariablePersistenceLayer as a GraphEngineLayer that listens for VARIABLE_ASSIGNER completion events, and updates the VariablePool interface to use selector-based lookups. Changes
Sequence DiagramsequenceDiagram
participant VA as VariableAssigner<br/>Node
participant Engine as Graph<br/>Engine
participant Layer as Conversation<br/>VariablePersist<br/>Layer
participant Pool as Variable<br/>Pool
participant Updater as Conversation<br/>Variable<br/>Updater
participant DB as Database
VA->>Engine: node execution completes
activate Engine
Engine->>Engine: emit NodeRunSucceededEvent
Engine->>Layer: on_event(NodeRunSucceededEvent)
deactivate Engine
activate Layer
Layer->>Layer: validate event type & node type
Layer->>VA: extract updated_variables from<br/>node run result
loop for each conversation-scoped variable
Layer->>Pool: get(selector)
activate Pool
Pool-->>Layer: Variable
deactivate Pool
Layer->>Updater: update(conversation_id,<br/>variable)
activate Updater
Updater->>DB: update conversation_variable
Updater-->>Layer: return
deactivate Updater
Layer->>Updater: flush()
end
deactivate Layer
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@api/services/conversation_variable_updater.py`:
- Around line 14-24: The update method in conversation_variable_updater.py
currently creates a Session(db.engine) without a context manager which can leak
DB connections on exceptions (including when raising
ConversationVariableNotFoundError); change the session usage in update to use a
context manager (with Session(db.engine) as session:) so the session is always
closed, and move the select/row fetch, the raise, row.data assignment, and
session.commit() inside that with block so cleanup happens even on errors.
🧹 Nitpick comments (3)
api/tests/unit_tests/core/workflow/nodes/variable_assigner/v1/test_variable_assigner_v1.py (1)
106-112: Consider a tiny helper to DRY event extraction.
The same “run → find succeeded event → get updated variables” sequence is repeated in three tests.[
](#)Details
♻️ Optional helper extraction
+def _get_updated_variables(node): + events = list(node.run()) + succeeded_event = next(event for event in events if isinstance(event, NodeRunSucceededEvent)) + updated_variables = common_helpers.get_updated_variables(succeeded_event.node_run_result.process_data) + assert updated_variables is not None + return updated_variables- events = list(node.run()) - succeeded_event = next(event for event in events if isinstance(event, NodeRunSucceededEvent)) - updated_variables = common_helpers.get_updated_variables(succeeded_event.node_run_result.process_data) - assert updated_variables is not None + updated_variables = _get_updated_variables(node)Also applies to: 202-208, 287-293
api/services/conversation_service.py (1)
20-20: Consider keyword arguments for update() to avoid call-order ambiguity.♻️ Suggested tweak
- updater.update(conversation_id, updated_variable) + updater.update(conversation_id=conversation_id, variable=updated_variable)Also applies to: 337-340
api/core/app/layers/conversation_variable_persist_layer.py (1)
38-53: Flush once after the loop to avoid per-variable overhead.If
flush()ever does real I/O, calling it per variable will be expensive. Consider flushing once after the loop (optionally guarded by a flag).♻️ Suggested refactor
- for item in updated_variables: + did_update = False + for item in updated_variables: selector = item.selector if len(selector) < 2: logger.warning("Conversation variable selector invalid. selector=%s", selector) continue if selector[0] != CONVERSATION_VARIABLE_NODE_ID: continue variable = self.graph_runtime_state.variable_pool.get(selector) if not isinstance(variable, Variable): logger.warning( "Conversation variable not found in variable pool. selector=%s", selector, ) continue self._conversation_variable_updater.update(conversation_id=conversation_id, variable=variable) - self._conversation_variable_updater.flush() + did_update = True + if did_update: + self._conversation_variable_updater.flush()
| def update(self, conversation_id: str, variable: Variable) -> None: | ||
| stmt = select(ConversationVariable).where( | ||
| ConversationVariable.id == variable.id, ConversationVariable.conversation_id == conversation_id | ||
| ) | ||
| with Session(db.engine) as session: | ||
| row = session.scalar(stmt) | ||
| if not row: | ||
| raise VariableOperatorNodeError("conversation variable not found in the database") | ||
| row.data = variable.model_dump_json() | ||
| session.commit() | ||
|
|
||
| def flush(self): | ||
| session = Session(db.engine) | ||
| row = session.scalar(stmt) | ||
| if not row: | ||
| raise ConversationVariableNotFoundError("conversation variable not found in the database") | ||
| row.data = variable.model_dump_json() | ||
| session.commit() | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, check the actual file content
cat -n api/services/conversation_variable_updater.pyRepository: agentic-review-benchmarks/dify-combined-coderabbit
Length of output: 1275
🏁 Script executed:
# Search for other session usage patterns in the codebase
rg "Session\(" --type py -B 2 -A 5 | head -100Repository: agentic-review-benchmarks/dify-combined-coderabbit
Length of output: 7752
🏁 Script executed:
# Check if there's a db module/session factory
find . -name "*.py" -type f | xargs rg "Session|session_factory" -l | head -20Repository: agentic-review-benchmarks/dify-combined-coderabbit
Length of output: 1497
🏁 Script executed:
# Look for context manager usage with Session
rg "with Session" --type py -B 2 -A 5Repository: agentic-review-benchmarks/dify-combined-coderabbit
Length of output: 50407
Wrap the session in a context manager to prevent connection leaks.
The session is created without cleanup, which can leak connections on exceptions—including the not-found path where ConversationVariableNotFoundError is raised. This project consistently uses context managers throughout (as seen in trigger services, workflow services, etc.). Use with Session(db.engine) as session: to ensure cleanup.
🔧 Suggested fix
- session = Session(db.engine)
- row = session.scalar(stmt)
- if not row:
- raise ConversationVariableNotFoundError("conversation variable not found in the database")
- row.data = variable.model_dump_json()
- session.commit()
+ with Session(db.engine) as session:
+ row = session.scalar(stmt)
+ if not row:
+ raise ConversationVariableNotFoundError("conversation variable not found in the database")
+ row.data = variable.model_dump_json()
+ session.commit()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def update(self, conversation_id: str, variable: Variable) -> None: | |
| stmt = select(ConversationVariable).where( | |
| ConversationVariable.id == variable.id, ConversationVariable.conversation_id == conversation_id | |
| ) | |
| with Session(db.engine) as session: | |
| row = session.scalar(stmt) | |
| if not row: | |
| raise VariableOperatorNodeError("conversation variable not found in the database") | |
| row.data = variable.model_dump_json() | |
| session.commit() | |
| def flush(self): | |
| session = Session(db.engine) | |
| row = session.scalar(stmt) | |
| if not row: | |
| raise ConversationVariableNotFoundError("conversation variable not found in the database") | |
| row.data = variable.model_dump_json() | |
| session.commit() | |
| def update(self, conversation_id: str, variable: Variable) -> None: | |
| stmt = select(ConversationVariable).where( | |
| ConversationVariable.id == variable.id, ConversationVariable.conversation_id == conversation_id | |
| ) | |
| with Session(db.engine) as session: | |
| row = session.scalar(stmt) | |
| if not row: | |
| raise ConversationVariableNotFoundError("conversation variable not found in the database") | |
| row.data = variable.model_dump_json() | |
| session.commit() |
🤖 Prompt for AI Agents
In `@api/services/conversation_variable_updater.py` around lines 14 - 24, The
update method in conversation_variable_updater.py currently creates a
Session(db.engine) without a context manager which can leak DB connections on
exceptions (including when raising ConversationVariableNotFoundError); change
the session usage in update to use a context manager (with Session(db.engine) as
session:) so the session is always closed, and move the select/row fetch, the
raise, row.data assignment, and session.commit() inside that with block so
cleanup happens even on errors.
Benchmark PR from agentic-review-benchmarks#4
Summary by CodeRabbit
Refactor
Tests
✏️ Tip: You can customize this high-level summary in your review settings.