feat: diamond dependency consistency (epoch-based atomic groups)#37
Merged
feat: diamond dependency consistency (epoch-based atomic groups)#37
Conversation
d6e283c to
e99b638
Compare
BaardBouvet
approved these changes
Mar 2, 2026
eddae62 to
30d8c06
Compare
Implements Steps 1-3 of the diamond dependency consistency plan: Step 1 — Data structures: - Diamond struct (convergence node, shared sources, intermediates) - ConsistencyGroup struct (topologically-ordered members, convergence points, epoch counter) with is_singleton() and advance_epoch() helpers Step 2 — Diamond detection: - StDag::detect_diamonds() — walks fan-in ST nodes, computes transitive ancestor sets per upstream branch, identifies shared ancestors, and merges overlapping diamonds - collect_ancestors() — recursive upstream walk - merge_overlapping_diamonds() — union-find-style merging Step 3 — Consistency group computation: - StDag::compute_consistency_groups() — converts diamonds into scheduler-ready groups in topological order, merges overlapping groups transitively, emits singleton groups for non-diamond STs 12 new unit tests covering simple/deep/nested diamonds, linear chains, independent multi-root fan-ins, overlapping diamonds, group merging, singleton detection, and epoch advancement. All 965 unit tests pass. Also updates PLAN_DIAMOND_DEPENDENCY_CONSISTENCY.md with implementation progress and prioritized remaining work (Steps 4-8).
…r, monitoring, E2E) - Add DiamondConsistency enum (None/Atomic) and pg_trickle.diamond_consistency GUC - Add diamond_consistency column to pgt_stream_tables catalog table - Add diamond_consistency parameter to create_stream_table/alter_stream_table - Replace flat scheduler loop with group-aware SAVEPOINT-based refresh - Add pgtrickle.diamond_groups() monitoring SQL function - Add 8 E2E tests in e2e_diamond_tests.rs - Add 4 unit tests for DiamondConsistency enum (969 total passing) - Update plan with progress markers (Steps 1-7 done, Step 8 remaining)
- SQL_REFERENCE: add diamond_consistency param to create/alter_stream_table, add pgtrickle.diamond_groups() function reference - CONFIGURATION: add pg_trickle.diamond_consistency GUC section and postgresql.conf example entry - ARCHITECTURE: add section 13 (Diamond Dependency Consistency) covering detection, consistency groups, scheduler SAVEPOINT wiring, and monitoring; add GUC row to quick reference table - CHANGELOG: add diamond dependency consistency feature under [Unreleased] - PLAN: mark Step 8 as Done, update remaining work section
The plan file was moved to plans/sql/ on main. Port all implementation notes (Steps 1-8, status tracking, remaining work) to the new location and remove the stale copy at plans/PLAN_DIAMOND_DEPENDENCY_CONSISTENCY.md.
…mentation plan - Mark Q1-Q3 and Q5 as resolved (already implemented in Steps 1-6) - Q4: decide on configurable diamond_schedule_policy GUC (fastest/slowest) with 'fastest' as default - Add §11: detailed 5-step implementation plan (Steps 9-13) for the new GUC, scheduler wiring, docs, unit tests, and E2E tests - Update remaining work to reference the new steps
…stency approaches Add comprehensive prior art section covering: - Jamie Brandon's internal consistency framing (scattered-thoughts.net) - Chandy-Lamport snapshots and vector clocks (foundational theory) - DBSP/Feldera synchronous stepping (diamond-free by construction) - Differential Dataflow frontier tracking and Timely Dataflow pointstamps - Apache Flink checkpoint barrier alignment (structural diamond analog) - Noria/Readyset partially-stateful upquery reconciliation - Materialize internal consistency (McSherry & Crooks 2020) - RisingWave barrier-based consistency - Comparison table positioning pg_trickle's epoch+SAVEPOINT approach
…cy design
- Replace §9 with detailed prior art survey (9.1–9.5):
- Jamie Brandon's internal consistency framing (scattered-thoughts.net)
- Chandy-Lamport snapshots and vector clocks (foundational theory)
- DBSP/Feldera synchronous stepping (diamond-free by construction)
- Differential Dataflow frontier tracking and Timely Dataflow pointstamps
- Apache Flink checkpoint barrier alignment (structural diamond analog)
- Noria/Readyset partially-stateful upquery reconciliation
- Materialize internal consistency (McSherry & Crooks 2020)
- RisingWave barrier-based consistency
- Comparison table positioning pg_trickle's epoch+SAVEPOINT approach
- Update Q4 resolution: diamond_schedule_policy is per-convergence-node
(Option A), not GUC-only; GUC serves as cluster-wide default fallback;
multiple convergence points use strictest-wins rule
- Expand Steps 9-13 implementation plan:
- Step 9: DiamondSchedulePolicy enum with stricter(), catalog column,
set_diamond_schedule_policy(), create/alter_stream_table params
- Step 10: group_schedule_policy() reads convergence_points + folds
with stricter(); is_group_due() uses effective policy
- Steps 12-13: add tests for stricter(), convergence-node-overrides-GUC,
nested-strictest-wins
Add per-convergence-node schedule policy for atomic diamond groups: - DiamondSchedulePolicy enum (Fastest/Slowest) with stricter() method - pg_trickle.diamond_schedule_policy GUC (default 'fastest') - diamond_schedule_policy catalog column, StreamTableMeta field - create_stream_table / alter_stream_table params - diamond_groups() now returns schedule_policy column - Scheduler uses group_schedule_policy() + is_group_due() helpers to evaluate group-level schedule instead of per-member checks - 7 unit tests + 7 E2E tests - Docs: CONFIGURATION.md, ARCHITECTURE.md, SQL_REFERENCE.md, CHANGELOG.md - Plan: all 13 steps marked complete
30d8c06 to
1a3aeea
Compare
b.val + c.val are both INT4 columns; PostgreSQL returns INT4 for the sum. The test declared total as i64 (INT8) causing a runtime type mismatch.
…acros
dbt wraps each model's statements in an implicit BEGIN...ROLLBACK block.
run_query() reuses the same connection, causing all pgtrickle DDL calls
(create/drop/alter/refresh) to be silently rolled back.
Fix: use {% call statement('...', auto_begin=False, fetch_result=False) %}
with explicit BEGIN; SQL; COMMIT; in all four adapter macros so DDL
commits unconditionally outside dbt's transaction scope.
Test script fixes: - PROJECT_ROOT: use ../.. not ../../.. (one level too many from integration_tests/) - Create .venv-dbt with Python 3.13; dbt-core 1.9 is incompatible with Python 3.14 - Use pip (inside venv) instead of pip3 (system binary) - Auto-detect Homebrew keg PostgreSQL bin dirs for psql on macOS Docs: - CHANGELOG.md: add Fixed section in [Unreleased] for all five bugs - dbt-pgtrickle/AGENTS.md: add critical DDL Transaction Safety section, remove stale adapter.cache_new reference, document .venv-dbt workflow - .gitignore: add .venv-dbt/
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Diamond Dependency Consistency
Implements Option 1 (Epoch-Based Atomic Groups) from
PLAN_DIAMOND_DEPENDENCY_CONSISTENCY.md.Problem
When stream tables form diamond-shaped dependency graphs (A→B, A→C, B→D, C→D), the convergence node D may read a mix of fresh and stale data if one intermediate ST refreshes but another fails — a split-version inconsistency.
Solution
Detect diamonds in the DAG, group affected STs into consistency groups, and optionally wrap their refresh in a
SAVEPOINT. On any failure, the entire group is rolled back.Changes
Core (
src/dag.rs):Diamond/ConsistencyGroupstructsdetect_diamonds()andcompute_consistency_groups()algorithmsDiamondConsistencyenum (None/Atomic)Catalog + GUC:
diamond_consistencycolumn onpgt_stream_tablespg_trickle.diamond_consistencyGUC (default'none')diamond_consistencyparam oncreate_stream_table()/alter_stream_table()Scheduler (
src/scheduler.rs):Monitoring (
src/api.rs):pgtrickle.diamond_groups()SQL functionDocumentation:
Tests: