Add local DB sync pipeline for rain.orderbook data#56
Conversation
Add local_db_path to Config struct for specifying the local SQLite database path used by the sync pipeline. Update registry URLs in both dev and production configs to latest strategies revision.
Add db_path field to RaindexProvider and inject a RusqliteExecutor-backed LocalDb into each RaindexClient worker. This allows query endpoints to read from the locally synced SQLite database.
Introduce sync module that bootstraps the SQLite schema and runs rain.orderbook's ClientRunner sync loop per network on a dedicated thread. Main wires it up: creates the DB directory, starts sync, and passes the DB path to RaindexProvider. Includes 9 unit tests covering bootstrap, settings parsing, network key extraction/deduplication, runner creation, and error paths.
Picks up latest changes including rusqlite version bump.
📝 WalkthroughWalkthroughAdds local DB sync: new Changes
Sequence Diagram(s)sequenceDiagram
participant App as Application
participant Config as Config Loader
participant Sync as sync::start_sync
participant Registry as RaindexProvider
participant LocalDB as LocalDb/RusqliteExecutor
participant Runner as ClientRunner
App->>Config: load configs (incl. local_db_path)
Config-->>App: Config { local_db_path, ... }
App->>Sync: start_sync(settings_yaml, db_path)
activate Sync
Sync->>LocalDB: bootstrap/create schema at db_path
LocalDB-->>Sync: ready
Sync->>Registry: load registry
Registry-->>Sync: Registry instance
Sync->>Runner: spawn per-network runners
loop periodic
Runner->>LocalDB: execute sync queries
LocalDB-->>Runner: results
end
deactivate Sync
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/config.rs (1)
12-20:⚠️ Potential issue | 🟡 MinorFail fast when
local_db_pathis empty.
local_db_pathcurrently allows empty values, which defers failure to later runtime paths. Validate it duringConfig::loadso misconfiguration fails early with a clear error.Suggested patch
impl Config { pub fn load(path: &Path) -> Result<Self, String> { let contents = std::fs::read_to_string(path).map_err(|e| format!("failed to read config: {e}"))?; - toml::from_str(&contents).map_err(|e| format!("failed to parse config: {e}")) + let config: Self = + toml::from_str(&contents).map_err(|e| format!("failed to parse config: {e}"))?; + + if config.local_db_path.trim().is_empty() { + return Err("failed to parse config: local_db_path must not be empty".to_string()); + } + + Ok(config) } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/config.rs` around lines 12 - 20, Config::load should validate that the parsed Config's local_db_path is non-empty and return an Err if it is empty; after successfully parsing with toml::from_str in Config::load, check config.local_db_path.trim().is_empty() and return a clear Err like "local_db_path cannot be empty" (including context) instead of allowing empty values to defer failure to runtime. Ensure you reference Config::load and the local_db_path field when adding this check.
🧹 Nitpick comments (1)
src/sync.rs (1)
56-62: Consider usingHashSetfor network key deduplication.The current approach using
Vec::contains()is O(n) per insertion. While this is fine for small numbers of orderbooks, aHashSetwould be more idiomatic and efficient.♻️ Optional refactor using HashSet
+use std::collections::HashSet; + // In run_sync: - let mut network_keys: Vec<String> = Vec::new(); - for ob in settings.orderbooks.values() { - if !network_keys.contains(&ob.network.key) { - network_keys.push(ob.network.key.clone()); - } - } - network_keys.sort(); + let mut network_keys: Vec<String> = settings + .orderbooks + .values() + .map(|ob| ob.network.key.clone()) + .collect::<HashSet<_>>() + .into_iter() + .collect(); + network_keys.sort();🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/sync.rs` around lines 56 - 62, Replace the O(n) dedup loop that builds network_keys with a HashSet-based collection: create a HashSet<String>, iterate over settings.orderbooks (use the same iterator over ob and ob.network.key), insert each ob.network.key into the set to deduplicate, then collect the set into a Vec<String> and call sort() to produce the final network_keys; update the variable named network_keys and keep references to settings.orderbooks and ob.network.key so the change is localized.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@src/config.rs`:
- Around line 12-20: Config::load should validate that the parsed Config's
local_db_path is non-empty and return an Err if it is empty; after successfully
parsing with toml::from_str in Config::load, check
config.local_db_path.trim().is_empty() and return a clear Err like
"local_db_path cannot be empty" (including context) instead of allowing empty
values to defer failure to runtime. Ensure you reference Config::load and the
local_db_path field when adding this check.
---
Nitpick comments:
In `@src/sync.rs`:
- Around line 56-62: Replace the O(n) dedup loop that builds network_keys with a
HashSet-based collection: create a HashSet<String>, iterate over
settings.orderbooks (use the same iterator over ob and ob.network.key), insert
each ob.network.key into the set to deduplicate, then collect the set into a
Vec<String> and call sort() to produce the final network_keys; update the
variable named network_keys and keep references to settings.orderbooks and
ob.network.key so the change is localized.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (9)
.gitignoreCargo.tomlconfig/dev.tomlconfig/rest-api.tomllib/rain.orderbooksrc/config.rssrc/main.rssrc/raindex/config.rssrc/sync.rs
Replace manual sync orchestration (~125 lines) with a single scheduler::start() call that handles thread spawn, runtime, DB bootstrap, settings parsing, and per-network sync loops internally. Returns NativeSyncHandle for lifecycle management; server now exits on sync init failure.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/main.rs`:
- Around line 239-247: The code assumes local_db_path.parent() yields a valid
directory and attempts to call create_dir_all on it, which fails for single-file
paths like "local.db" where parent() is None or empty; update the block around
local_db_path.parent() to explicitly handle empty or rootless parents by
skipping directory creation when parent is None or when parent.as_os_str() is
empty (or equals "."), only calling std::fs::create_dir_all(parent) when a real
directory path exists, and keep the existing error handling (tracing::error!,
drop(log_guard), std::process::exit(1)) for actual creation failures.
- Around line 250-257: The code currently assigns the sync handle from
sync::start_sync to _sync_handle but never calls stop_and_join(), relying on
Drop; change this so the handle is stored (remove leading underscore) and call
its stop_and_join() before shutdown (in the Err(e) branch or during graceful
shutdown flow) — i.e., capture the Ok(handle) into a variable like sync_handle
and invoke sync_handle.stop_and_join(). Ensure you handle the Result from
stop_and_join() (unwrap or log error) consistent with the test pattern
result.unwrap().stop_and_join().
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
config/dev.tomlconfig/rest-api.tomllib/rain.orderbooksrc/main.rssrc/sync.rs
🚧 Files skipped from review as they are similar to previous changes (2)
- config/rest-api.toml
- config/dev.toml
| if let Some(parent) = local_db_path.parent() { | ||
| if !parent.exists() { | ||
| if let Err(e) = std::fs::create_dir_all(parent) { | ||
| tracing::error!(error = %e, path = %parent.display(), "failed to create local db directory"); | ||
| drop(log_guard); | ||
| std::process::exit(1); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Guard against empty parent paths for single-file DB paths.
For values like local_db_path = "local.db", parent() can be empty; attempting directory creation on that path can fail and abort startup.
💡 Proposed fix
- if let Some(parent) = local_db_path.parent() {
+ if let Some(parent) = local_db_path
+ .parent()
+ .filter(|p| !p.as_os_str().is_empty())
+ {
if !parent.exists() {
if let Err(e) = std::fs::create_dir_all(parent) {
tracing::error!(error = %e, path = %parent.display(), "failed to create local db directory");
drop(log_guard);
std::process::exit(1);
}
}
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/main.rs` around lines 239 - 247, The code assumes local_db_path.parent()
yields a valid directory and attempts to call create_dir_all on it, which fails
for single-file paths like "local.db" where parent() is None or empty; update
the block around local_db_path.parent() to explicitly handle empty or rootless
parents by skipping directory creation when parent is None or when
parent.as_os_str() is empty (or equals "."), only calling
std::fs::create_dir_all(parent) when a real directory path exists, and keep the
existing error handling (tracing::error!, drop(log_guard),
std::process::exit(1)) for actual creation failures.
There was a problem hiding this comment.
Our configs always use paths with a parent directory. A bare filename would still exit with an error — not a silent bug. Skipping for now.
There was a problem hiding this comment.
@findolor, understood — if your configurations consistently provide parent directories, the guard isn't necessary. Thanks for clarifying your setup!
✏️ Learnings added
Learnt from: findolor
Repo: ST0x-Technology/st0x.rest.api PR: 56
File: src/main.rs:239-247
Timestamp: 2026-03-02T09:02:05.104Z
Learning: In the ST0x-Technology/st0x.rest.api repository, configuration files always specify database paths with parent directories (e.g., "data/raindex.db" or "/mnt/data/st0x-rest-api/raindex.db"), never bare filenames like "local.db". This is a project configuration convention.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
🧠 Learnings used
Learnt from: findolor
Repo: ST0x-Technology/st0x.rest.api PR: 18
File: src/fairings/rate_limiter.rs:158-203
Timestamp: 2026-02-13T09:37:37.357Z
Learning: When multiple guards or layers write to the same cached rate-limit information, document and enforce the execution order so that the client-visible headers reflect the intended quota (global vs per-key). In this repo, rate-limiter.rs and auth-related guards intentionally write to the same CachedRateLimitInfo entry; ensure the per-key quota is what’s surfaced to clients and that the order is explicit in route signatures. Include tests that verify the final headers reflect the intended quota and that writes are intentional and auditable.
| let _sync_handle = match sync::start_sync(settings_yaml, local_db_path.clone()) { | ||
| Ok(handle) => handle, | ||
| Err(e) => { | ||
| tracing::error!(error = %e, "failed to start local db sync"); | ||
| drop(log_guard); | ||
| std::process::exit(1); | ||
| } | ||
| }; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Search for NativeSyncHandle struct definition and implementations
echo "=== Searching for NativeSyncHandle definition ==="
rg -n "struct NativeSyncHandle" --type rust
echo ""
echo "=== Searching for Drop implementation for NativeSyncHandle ==="
rg -n "impl Drop for NativeSyncHandle" --type rust
echo ""
echo "=== Searching for stop_and_join method ==="
rg -n "fn stop_and_join|pub fn stop_and_join" --type rust
echo ""
echo "=== Searching for NativeSyncHandle usage patterns ==="
rg -n "NativeSyncHandle|sync::start_sync" --type rust -B 2 -A 5 | head -100
echo ""
echo "=== Checking sync module for Drop or shutdown behavior ==="
fd -e rs "sync" | head -20Repository: ST0x-Technology/st0x.rest.api
Length of output: 122
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Finding sync module/files ==="
find . -name "sync.rs" -o -name "sync" -type d | grep -v target | grep -v ".git"
echo ""
echo "=== Searching for start_sync function definition ==="
rg -n "fn start_sync" --type rust -B 2 -A 10
echo ""
echo "=== Checking for sync-related imports in main.rs ==="
rg -n "use.*sync|mod sync" src/main.rs --type rust -B 2 -A 2
echo ""
echo "=== Looking at Cargo.toml for dependencies ==="
cat Cargo.toml | head -50Repository: ST0x-Technology/st0x.rest.api
Length of output: 2417
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Reading full sync.rs file ==="
wc -l src/sync.rs
echo ""
cat -n src/sync.rsRepository: ST0x-Technology/st0x.rest.api
Length of output: 2177
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Checking if rain_orderbook is a submodule ==="
ls -la lib/rain.orderbook/ 2>/dev/null | head -20
echo ""
echo "=== Searching for NativeSyncHandle in submodules ==="
find lib/rain.orderbook -name "*.rs" -type f 2>/dev/null | xargs grep -l "struct NativeSyncHandle\|impl Drop.*NativeSyncHandle" 2>/dev/null | head -10
echo ""
echo "=== Searching for stop_and_join implementation in submodules ==="
find lib/rain.orderbook -name "*.rs" -type f 2>/dev/null | xargs grep -l "fn stop_and_join" 2>/dev/null | head -10Repository: ST0x-Technology/st0x.rest.api
Length of output: 285
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Full context around sync_handle in main.rs ==="
rg -n "sync_handle|stop_and_join" src/main.rs -B 5 -A 10
echo ""
echo "=== Check if there's a shutdown hook or cleanup code ==="
rg -n "shutdown|cleanup|drop|on_attach|on_launch" src/main.rs --type rust | head -30
echo ""
echo "=== Check Rocket lifecycle hooks ==="
rg -n "State|attach|guard|catch" src/main.rs --type rust -B 2 -A 2 | head -50Repository: ST0x-Technology/st0x.rest.api
Length of output: 2980
Explicitly call stop_and_join() on the sync handle before shutdown.
The _sync_handle is stored with an underscore prefix but never has stop_and_join() invoked. The test in src/sync.rs (line 58) demonstrates the correct pattern: result.unwrap().stop_and_join(). Call this method explicitly during graceful shutdown instead of relying on Drop semantics.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/main.rs` around lines 250 - 257, The code currently assigns the sync
handle from sync::start_sync to _sync_handle but never calls stop_and_join(),
relying on Drop; change this so the handle is stored (remove leading underscore)
and call its stop_and_join() before shutdown (in the Err(e) branch or during
graceful shutdown flow) — i.e., capture the Ok(handle) into a variable like
sync_handle and invoke sync_handle.stop_and_join(). Ensure you handle the Result
from stop_and_join() (unwrap or log error) consistent with the test pattern
result.unwrap().stop_and_join().
There was a problem hiding this comment.
_sync_handle intentionally keeps the handle alive without calling methods. stop_and_join() would block on in-flight network requests during shutdown — unnecessary since process exit cleans up the thread. The test uses it only to avoid leaking threads between runs.
There was a problem hiding this comment.
@findolor, thank you for the clarification! That makes perfect sense—blocking on in-flight network requests during shutdown would be counterproductive, and the OS cleanup on process exit is sufficient for production. I appreciate you explaining the distinction between test cleanup and production shutdown behavior.
✏️ Learnings added
Learnt from: findolor
Repo: ST0x-Technology/st0x.rest.api PR: 56
File: src/main.rs:250-257
Timestamp: 2026-03-02T09:02:06.122Z
Learning: In src/main.rs, the `_sync_handle` from `sync::start_sync()` intentionally uses an underscore prefix to keep the NativeSyncHandle alive without calling `stop_and_join()`. Calling `stop_and_join()` during production shutdown would block on in-flight network requests, which is unnecessary since process exit cleans up the thread automatically. The `stop_and_join()` method is only used in tests to avoid leaking threads between test runs.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
🧠 Learnings used
Learnt from: CR
Repo: ST0x-Technology/st0x.rest.api PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-02-19T09:28:42.183Z
Learning: Applies to **/*.rs : Never use `expect` or `unwrap` in production code; handle errors gracefully or exit with a message
Learnt from: findolor
Repo: ST0x-Technology/st0x.rest.api PR: 18
File: src/fairings/rate_limiter.rs:158-203
Timestamp: 2026-02-13T09:37:37.357Z
Learning: When multiple guards or layers write to the same cached rate-limit information, document and enforce the execution order so that the client-visible headers reflect the intended quota (global vs per-key). In this repo, rate-limiter.rs and auth-related guards intentionally write to the same CachedRateLimitInfo entry; ensure the per-key quota is what’s surfaced to clients and that the order is explicit in route signatures. Include tests that verify the final headers reflect the intended quota and that writes are intentional and auditable.
Dependent PRs
Motivation
The REST API needs to sync orderbook data locally via SQLite so query endpoints can read from a local database instead of relying solely on subgraph queries. This wires rain.orderbook's sync pipeline into the st0x REST API.
Solution
local_db_pathconfig field for specifying the SQLite database locationscheduler::start()to handle DB sync (thread spawn, runtime, bootstrap, settings parsing, per-network sync loops)LocalDbintoRaindexProviderworker threads so query endpoints can read from the synced databaseLocalDb/RusqliteExecutorpublic APIsDeployment Note
Production registry URL needs to be updated to:
Checks
By submitting this for review, I'm confirming I've done the following:
Summary by CodeRabbit
New Features
Chores