From 15c4771d3ba31e4fd8ee9dc52007c128bac3f584 Mon Sep 17 00:00:00 2001 From: user1303836 Date: Sun, 8 Mar 2026 21:51:35 -0400 Subject: [PATCH] Add protocol/TVL analytics pack: Gold datasets, protocol activity and TVL endpoints (P5-W3) Co-Authored-By: Claude Opus 4.6 Entire-Checkpoint: e7b6f974cd94 --- README.md | 35 +- adapters/src/lib.rs | 1 + adapters/src/protocol_analytics.rs | 632 ++++++++++++++++++ adapters/src/v2_repo.rs | 225 ++++++- api/src/main.rs | 396 ++++++++++- core/src/materializer.rs | 266 +++++++- .../20260311000000_add_protocol_tvl_gold.sql | 56 ++ 7 files changed, 1599 insertions(+), 12 deletions(-) create mode 100644 adapters/src/protocol_analytics.rs create mode 100644 migrations/20260311000000_add_protocol_tvl_gold.sql diff --git a/README.md b/README.md index b2833b2..dfa0c18 100644 --- a/README.md +++ b/README.md @@ -288,8 +288,10 @@ These endpoints are the preferred forward path for new consumers. They provide f | `GET` | `/v1/forensics/activity` | Wallet interaction analysis (top counterparties, cross-chain summary) | | `GET` | `/v1/analytics/hl/trader` | Per-trader Hyperliquid PnL analytics (win rate, volume, coin breakdown) | | `GET` | `/v1/analytics/hl/market` | Per-coin Hyperliquid market analytics (volume, traders, PnL distribution) | +| `GET` | `/v1/analytics/protocol/activity` | Per-protocol event activity (event counts, unique participants, time range) | +| `GET` | `/v1/analytics/protocol/tvl` | TVL analytics from pool snapshots (per-pool, per-protocol, aggregate) | -The dataset records endpoint supports filtering via `?target_id=&network=&time_start=&time_end=&limit=N&offset=N` query parameters (all optional). Queryable datasets: `token_transfers`, `native_balance_deltas`, `decoded_events`, `hl_fills`, `hl_funding`, `positions`, `wallet_ledger`, `balance_history`, `hl_pnl_summary`, `hl_trade_history`. +The dataset records endpoint supports filtering via `?target_id=&network=&time_start=&time_end=&limit=N&offset=N` query parameters (all optional). Queryable datasets: `token_transfers`, `native_balance_deltas`, `decoded_events`, `hl_fills`, `hl_funding`, `positions`, `wallet_ledger`, `balance_history`, `hl_pnl_summary`, `hl_trade_history`, `protocol_events`, `pool_snapshots`. ### POST /v1/ingest @@ -468,7 +470,7 @@ curl -X POST http://127.0.0.1:3000/v1/export/dataset \ # Response: {"id": "", "state": "pending", "dataset": "token_transfers", "format": "jsonl", "record_count": null, "message": null} ``` -Creates an async export job for a dataset. Supported formats: `jsonl` (default), `csv`. Optional filters: `target_id`, `network`, `time_start`, `time_end`. Exportable datasets: `token_transfers`, `native_balance_deltas`, `decoded_events`, `hl_fills`, `hl_funding`, `positions`, `wallet_ledger`, `balance_history`, `hl_pnl_summary`, `hl_trade_history`. Maximum 100,000 records per export. +Creates an async export job for a dataset. Supported formats: `jsonl` (default), `csv`. Optional filters: `target_id`, `network`, `time_start`, `time_end`. Exportable datasets: `token_transfers`, `native_balance_deltas`, `decoded_events`, `hl_fills`, `hl_funding`, `positions`, `wallet_ledger`, `balance_history`, `hl_pnl_summary`, `hl_trade_history`, `protocol_events`, `pool_snapshots`. Maximum 100,000 records per export. #### Sink delivery @@ -580,7 +582,7 @@ Downloads the completed export data. The response includes a `Content-Dispositio ### Gold Datasets -Spectraplex includes four Gold-tier datasets materialized from Silver data: +Spectraplex includes six Gold-tier datasets materialized from Silver data: - **`wallet_ledger`** — wallet-scoped financial records with counterparty tracking, network awareness, and nullable cost basis / proceeds fields. Derived from `token_transfers`, `native_balance_deltas`, `hl_fills`, and `hl_funding`. Queryable and exportable via the standard dataset API. @@ -590,6 +592,10 @@ Spectraplex includes four Gold-tier datasets materialized from Silver data: - **`hl_trade_history`** — logical trade records built by grouping Silver `hl_fills` into open→close sequences. Each record captures entry/exit price, size, realized PnL, fees, and the number of fills that composed the trade. Enables trade-by-trade performance analysis. +- **`protocol_events`** — protocol-level event records derived from Silver `decoded_events`. Each record captures a significant protocol interaction (swap, mint, burn, liquidity change) classified by event type, with a link to the originating decoded event. Supports per-protocol activity analytics. + +- **`pool_snapshots`** — per-pool reserve state snapshots derived from Silver `decoded_events` and `token_transfers`. Each record captures token pair reserves, optional USD-denominated TVL, and protocol association. Supports TVL tracking and pool-level analytics. + All Gold datasets are queryable via `/v1/datasets/{name}/records` and exportable via `/v1/export/dataset`. ### GET /v1/export/tax @@ -639,6 +645,28 @@ curl -H "Authorization: Bearer " \ Returns per-coin Hyperliquid market analytics: total volume, unique trader count, total trades, aggregate PnL, and average trade size per coin. Supports `?target_id`, `?network`, `?time_start`, `?time_end` filters. +### GET /v1/analytics/protocol/activity + +```bash +curl -H "Authorization: Bearer " \ + "http://127.0.0.1:3000/v1/analytics/protocol/activity?protocol_address=0xUniswap" + +# Response: {"protocol_address": "0xUniswap", "event_counts_by_type": [{"event_type": "swap", "count": 100}], "unique_participants": 50, "total_events": 110, "time_start": 1700000000, "time_end": 1700100000} +``` + +Returns per-protocol event activity analytics: event counts grouped by type, unique participant count, and time range. Supports `?target_id`, `?network`, `?time_start`, `?time_end`, `?protocol_address` filters. + +### GET /v1/analytics/protocol/tvl + +```bash +curl -H "Authorization: Bearer " \ + "http://127.0.0.1:3000/v1/analytics/protocol/tvl" + +# Response: {"pools": [{"pool_address": "0xPool", "protocol_address": "0xProto", "reserve0": "1000", "reserve1": "2000000", "tvl_usd": "4000000", ...}], "total_tvl": "4000000", "protocols": [...]} +``` + +Returns TVL analytics from pool snapshots: per-pool reserves and TVL, per-protocol aggregate TVL, and overall total TVL. Supports `?target_id`, `?network`, `?time_start`, `?time_end` filters. + ## Configuration Spectraplex uses a layered configuration system powered by [figment](https://crates.io/crates/figment). Settings are loaded in order of priority: @@ -747,6 +775,7 @@ All tables use UUIDs as primary keys and support idempotent batch inserts (`ON C - [ ] Add ETL-first delivery modes such as dataset exports, sink jobs, and warehouse-friendly outputs - [x] Keep wallet/tax/forensics materializations first-class, but as downstream products of the broader indexing core (P5-W1: wallet_ledger, balance_history, tax export, forensics activity) - [x] Add Hyperliquid-specific analytics as Gold datasets proving a dashboard can consume Spectraplex outputs directly (P5-W2: hl_pnl_summary, hl_trade_history, trader and market analytics) +- [x] Add protocol/TVL analytics as Gold datasets enabling protocol dashboards from Spectraplex outputs (P5-W3: protocol_events, pool_snapshots, protocol activity and TVL analytics) The detailed roadmap lives in [`SPECTRAPLEX_STRATEGY_AND_EXECUTION_PLAN.md`](SPECTRAPLEX_STRATEGY_AND_EXECUTION_PLAN.md). diff --git a/adapters/src/lib.rs b/adapters/src/lib.rs index c4b15ca..66f0c1c 100644 --- a/adapters/src/lib.rs +++ b/adapters/src/lib.rs @@ -7,6 +7,7 @@ pub mod hyperliquid; pub mod hyperliquid_parser; pub mod hyperliquid_ws; pub mod ledger_derivation; +pub mod protocol_analytics; pub mod repo; pub mod solana; pub mod solana_grpc; diff --git a/adapters/src/protocol_analytics.rs b/adapters/src/protocol_analytics.rs new file mode 100644 index 0000000..b103adb --- /dev/null +++ b/adapters/src/protocol_analytics.rs @@ -0,0 +1,632 @@ +//! Gold-tier protocol analytics materialization and aggregation. +//! +//! Computes Gold datasets (protocol_events, pool_snapshots) from Silver +//! decoded_events and token_transfers, and builds dashboard-ready +//! analytics responses (ProtocolActivity, TvlAnalytics). + +use bigdecimal::BigDecimal; +use spectraplex_core::materializer::{ + DecodedEvent, EventTypeCount, PoolSnapshot, PoolTvlSummary, ProtocolActivity, ProtocolEvent, + ProtocolTvlSummary, TokenTransfer, TvlAnalytics, +}; +use std::collections::{BTreeMap, HashMap, HashSet}; +use uuid::Uuid; + +/// Classify a decoded event name into a protocol event type. +fn classify_event(event_name: Option<&str>) -> &'static str { + match event_name { + Some(name) => { + let lower = name.to_lowercase(); + if lower.contains("swap") { + "swap" + } else if lower.contains("mint") { + "mint" + } else if lower.contains("burn") { + "burn" + } else if lower.contains("addliquidity") + || lower.contains("add_liquidity") + || lower.contains("liquidityadded") + || lower.contains("deposit") + { + "liquidity_added" + } else if lower.contains("removeliquidity") + || lower.contains("remove_liquidity") + || lower.contains("liquidityremoved") + || lower.contains("withdraw") + { + "liquidity_removed" + } else if lower.contains("transfer") { + "transfer" + } else { + "other" + } + } + None => "other", + } +} + +/// Compute protocol event records from decoded_events. +/// +/// Groups decoded events by their `program_or_contract` (treated as the +/// protocol address) and classifies each event by its name. +pub fn compute_protocol_events( + decoded_events: &[DecodedEvent], + pool_address_hint: Option<&str>, +) -> Vec { + let now = chrono::Utc::now(); + decoded_events + .iter() + .map(|de| { + let event_type = classify_event(de.event_name.as_deref()); + ProtocolEvent { + id: Uuid::new_v4(), + network: de.network.clone(), + protocol_address: de.program_or_contract.clone(), + protocol_name: None, + event_type: event_type.to_string(), + event_details: de.decoded_fields.clone(), + pool_address: pool_address_hint.map(|s| s.to_string()), + raw_event_id: Some(de.id), + timestamp: de.created_at.timestamp(), + dataset_version_id: de.dataset_version_id, + created_at: now, + } + }) + .collect() +} + +/// Compute pool snapshot records from decoded events and token transfers. +/// +/// This is a simplified materialization that derives pool state from +/// events that mention token pair interactions. In production, this would +/// be enhanced with actual reserve tracking from on-chain state. +pub fn compute_pool_snapshots( + decoded_events: &[DecodedEvent], + _token_transfers: &[TokenTransfer], + pool_address: &str, + token0: (&str, Option<&str>), + token1: (&str, Option<&str>), +) -> Vec { + let now = chrono::Utc::now(); + + if decoded_events.is_empty() { + return Vec::new(); + } + + // Derive a single snapshot from the latest event + let latest = decoded_events.iter().max_by_key(|e| e.created_at).unwrap(); + + let protocol_address = latest.program_or_contract.clone(); + + // Extract reserve values from decoded_fields if available + let reserve0 = latest + .decoded_fields + .get("reserve0") + .or_else(|| latest.decoded_fields.get("amount0")) + .and_then(|v| v.as_str()) + .and_then(|s| s.parse::().ok()) + .unwrap_or_else(|| BigDecimal::from(0)); + + let reserve1 = latest + .decoded_fields + .get("reserve1") + .or_else(|| latest.decoded_fields.get("amount1")) + .and_then(|v| v.as_str()) + .and_then(|s| s.parse::().ok()) + .unwrap_or_else(|| BigDecimal::from(0)); + + vec![PoolSnapshot { + id: Uuid::new_v4(), + network: latest.network.clone(), + pool_address: pool_address.to_string(), + protocol_address, + protocol_name: None, + token0_address: token0.0.to_string(), + token0_symbol: token0.1.map(|s| s.to_string()), + token1_address: token1.0.to_string(), + token1_symbol: token1.1.map(|s| s.to_string()), + reserve0, + reserve1, + tvl_usd: None, + snapshot_timestamp: latest.created_at.timestamp(), + block_number: None, + dataset_version_id: latest.dataset_version_id, + created_at: now, + }] +} + +/// Build per-protocol activity analytics from protocol event records. +pub fn build_protocol_activity( + protocol_address: &str, + events: &[ProtocolEvent], +) -> ProtocolActivity { + let mut type_counts: BTreeMap = BTreeMap::new(); + let mut participants: HashSet = HashSet::new(); + let mut time_start: Option = None; + let mut time_end: Option = None; + + for event in events { + if event.protocol_address != protocol_address { + continue; + } + *type_counts.entry(event.event_type.clone()).or_insert(0) += 1; + + // Extract participant addresses from event_details if available + if let Some(from) = event.event_details.get("from").and_then(|v| v.as_str()) { + participants.insert(from.to_string()); + } + if let Some(to) = event.event_details.get("to").and_then(|v| v.as_str()) { + participants.insert(to.to_string()); + } + if let Some(sender) = event.event_details.get("sender").and_then(|v| v.as_str()) { + participants.insert(sender.to_string()); + } + if let Some(recipient) = event + .event_details + .get("recipient") + .and_then(|v| v.as_str()) + { + participants.insert(recipient.to_string()); + } + + time_start = Some(time_start.map_or(event.timestamp, |t: i64| t.min(event.timestamp))); + time_end = Some(time_end.map_or(event.timestamp, |t: i64| t.max(event.timestamp))); + } + + let total_events: usize = type_counts.values().sum(); + + let event_counts_by_type = type_counts + .into_iter() + .map(|(event_type, count)| EventTypeCount { event_type, count }) + .collect(); + + ProtocolActivity { + protocol_address: protocol_address.to_string(), + event_counts_by_type, + unique_participants: participants.len(), + total_events, + time_start, + time_end, + } +} + +/// Build TVL analytics from pool snapshots. +pub fn build_tvl_analytics(snapshots: &[PoolSnapshot]) -> TvlAnalytics { + let mut pools: Vec = Vec::new(); + let mut protocol_map: HashMap, Vec>)> = + HashMap::new(); + + // Use latest snapshot per pool + let mut latest_by_pool: HashMap = HashMap::new(); + for snap in snapshots { + let entry = latest_by_pool + .entry(snap.pool_address.clone()) + .or_insert(snap); + if snap.snapshot_timestamp > entry.snapshot_timestamp { + *entry = snap; + } + } + + for snap in latest_by_pool.values() { + pools.push(PoolTvlSummary { + pool_address: snap.pool_address.clone(), + protocol_address: snap.protocol_address.clone(), + token0_symbol: snap.token0_symbol.clone(), + token1_symbol: snap.token1_symbol.clone(), + reserve0: snap.reserve0.clone(), + reserve1: snap.reserve1.clone(), + tvl_usd: snap.tvl_usd.clone(), + snapshot_timestamp: snap.snapshot_timestamp, + }); + + let entry = protocol_map + .entry(snap.protocol_address.clone()) + .or_insert_with(|| (snap.protocol_name.clone(), Vec::new())); + entry.1.push(snap.tvl_usd.clone()); + } + + pools.sort_by(|a, b| a.pool_address.cmp(&b.pool_address)); + + let mut protocols: Vec = protocol_map + .into_iter() + .map(|(addr, (name, tvls))| { + let total_tvl = if tvls.iter().all(|t| t.is_some()) { + Some( + tvls.iter() + .filter_map(|t| t.as_ref()) + .fold(BigDecimal::from(0), |acc, v| acc + v), + ) + } else if tvls.iter().any(|t| t.is_some()) { + Some( + tvls.iter() + .filter_map(|t| t.as_ref()) + .fold(BigDecimal::from(0), |acc, v| acc + v), + ) + } else { + None + }; + ProtocolTvlSummary { + protocol_address: addr, + protocol_name: name, + pool_count: tvls.len(), + total_tvl, + } + }) + .collect(); + protocols.sort_by(|a, b| a.protocol_address.cmp(&b.protocol_address)); + + let total_tvl = { + let all_tvls: Vec<&BigDecimal> = pools.iter().filter_map(|p| p.tvl_usd.as_ref()).collect(); + if all_tvls.is_empty() { + None + } else { + Some( + all_tvls + .into_iter() + .fold(BigDecimal::from(0), |acc, v| acc + v), + ) + } + }; + + TvlAnalytics { + pools, + total_tvl, + protocols, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use bigdecimal::BigDecimal; + use std::str::FromStr; + + fn make_decoded_event( + contract: &str, + event_name: Option<&str>, + network: &str, + fields: serde_json::Value, + ) -> DecodedEvent { + DecodedEvent { + id: Uuid::new_v4(), + raw_transaction_id: None, + network: network.to_string(), + program_or_contract: contract.to_string(), + event_signature: None, + event_name: event_name.map(|s| s.to_string()), + log_index: 0, + decoded_fields: fields, + raw_fields: serde_json::json!({}), + dataset_version_id: None, + created_at: chrono::Utc::now(), + } + } + + #[test] + fn classify_event_names() { + assert_eq!(classify_event(Some("Swap")), "swap"); + assert_eq!(classify_event(Some("swap")), "swap"); + assert_eq!(classify_event(Some("Mint")), "mint"); + assert_eq!(classify_event(Some("Burn")), "burn"); + assert_eq!(classify_event(Some("AddLiquidity")), "liquidity_added"); + assert_eq!(classify_event(Some("add_liquidity")), "liquidity_added"); + assert_eq!(classify_event(Some("Deposit")), "liquidity_added"); + assert_eq!(classify_event(Some("RemoveLiquidity")), "liquidity_removed"); + assert_eq!(classify_event(Some("Withdraw")), "liquidity_removed"); + assert_eq!(classify_event(Some("Transfer")), "transfer"); + assert_eq!(classify_event(Some("Approval")), "other"); + assert_eq!(classify_event(None), "other"); + } + + #[test] + fn compute_protocol_events_maps_decoded_events() { + let events = vec![ + make_decoded_event( + "0xUniswap", + Some("Swap"), + "ethereum-mainnet", + serde_json::json!({"amount0": "100", "amount1": "-50"}), + ), + make_decoded_event( + "0xUniswap", + Some("Mint"), + "ethereum-mainnet", + serde_json::json!({"amount0": "200", "amount1": "400"}), + ), + make_decoded_event( + "0xSushi", + Some("Transfer"), + "ethereum-mainnet", + serde_json::json!({}), + ), + ]; + let protocol_events = compute_protocol_events(&events, Some("0xPool")); + assert_eq!(protocol_events.len(), 3); + assert_eq!(protocol_events[0].event_type, "swap"); + assert_eq!(protocol_events[0].protocol_address, "0xUniswap"); + assert_eq!(protocol_events[0].pool_address.as_deref(), Some("0xPool")); + assert_eq!(protocol_events[1].event_type, "mint"); + assert_eq!(protocol_events[2].event_type, "transfer"); + assert_eq!(protocol_events[2].protocol_address, "0xSushi"); + } + + #[test] + fn compute_protocol_events_empty() { + let result = compute_protocol_events(&[], None); + assert!(result.is_empty()); + } + + #[test] + fn compute_pool_snapshots_from_events() { + let events = vec![make_decoded_event( + "0xUniswap", + Some("Swap"), + "ethereum-mainnet", + serde_json::json!({"reserve0": "1000", "reserve1": "2000000"}), + )]; + let snapshots = compute_pool_snapshots( + &events, + &[], + "0xPool", + ("0xWETH", Some("WETH")), + ("0xUSDC", Some("USDC")), + ); + assert_eq!(snapshots.len(), 1); + assert_eq!(snapshots[0].pool_address, "0xPool"); + assert_eq!(snapshots[0].protocol_address, "0xUniswap"); + assert_eq!(snapshots[0].token0_symbol.as_deref(), Some("WETH")); + assert_eq!(snapshots[0].token1_symbol.as_deref(), Some("USDC")); + assert_eq!(snapshots[0].reserve0, BigDecimal::from_str("1000").unwrap()); + assert_eq!( + snapshots[0].reserve1, + BigDecimal::from_str("2000000").unwrap() + ); + } + + #[test] + fn compute_pool_snapshots_empty() { + let result = compute_pool_snapshots(&[], &[], "0xPool", ("0xA", None), ("0xB", None)); + assert!(result.is_empty()); + } + + #[test] + fn protocol_activity_aggregation() { + let events = vec![ + ProtocolEvent { + id: Uuid::new_v4(), + network: "ethereum-mainnet".to_string(), + protocol_address: "0xUniswap".to_string(), + protocol_name: None, + event_type: "swap".to_string(), + event_details: serde_json::json!({"from": "0xAlice", "to": "0xBob"}), + pool_address: None, + raw_event_id: None, + timestamp: 1000, + dataset_version_id: None, + created_at: chrono::Utc::now(), + }, + ProtocolEvent { + id: Uuid::new_v4(), + network: "ethereum-mainnet".to_string(), + protocol_address: "0xUniswap".to_string(), + protocol_name: None, + event_type: "swap".to_string(), + event_details: serde_json::json!({"from": "0xAlice", "sender": "0xCharlie"}), + pool_address: None, + raw_event_id: None, + timestamp: 2000, + dataset_version_id: None, + created_at: chrono::Utc::now(), + }, + ProtocolEvent { + id: Uuid::new_v4(), + network: "ethereum-mainnet".to_string(), + protocol_address: "0xUniswap".to_string(), + protocol_name: None, + event_type: "mint".to_string(), + event_details: serde_json::json!({"sender": "0xAlice"}), + pool_address: None, + raw_event_id: None, + timestamp: 3000, + dataset_version_id: None, + created_at: chrono::Utc::now(), + }, + ]; + + let activity = build_protocol_activity("0xUniswap", &events); + assert_eq!(activity.protocol_address, "0xUniswap"); + assert_eq!(activity.total_events, 3); + // Unique participants: 0xAlice, 0xBob, 0xCharlie + assert_eq!(activity.unique_participants, 3); + assert_eq!(activity.time_start, Some(1000)); + assert_eq!(activity.time_end, Some(3000)); + // Two event types: swap (2) and mint (1) + assert_eq!(activity.event_counts_by_type.len(), 2); + let swap_count = activity + .event_counts_by_type + .iter() + .find(|e| e.event_type == "swap") + .unwrap(); + assert_eq!(swap_count.count, 2); + let mint_count = activity + .event_counts_by_type + .iter() + .find(|e| e.event_type == "mint") + .unwrap(); + assert_eq!(mint_count.count, 1); + } + + #[test] + fn protocol_activity_empty() { + let activity = build_protocol_activity("0xNone", &[]); + assert_eq!(activity.total_events, 0); + assert_eq!(activity.unique_participants, 0); + assert!(activity.time_start.is_none()); + assert!(activity.time_end.is_none()); + } + + #[test] + fn tvl_analytics_single_pool() { + let snapshots = vec![PoolSnapshot { + id: Uuid::new_v4(), + network: "ethereum-mainnet".to_string(), + pool_address: "0xPool".to_string(), + protocol_address: "0xProto".to_string(), + protocol_name: Some("Uniswap".to_string()), + token0_address: "0xWETH".to_string(), + token0_symbol: Some("WETH".to_string()), + token1_address: "0xUSDC".to_string(), + token1_symbol: Some("USDC".to_string()), + reserve0: BigDecimal::from(1000), + reserve1: BigDecimal::from(2000000), + tvl_usd: Some(BigDecimal::from(4000000)), + snapshot_timestamp: 1700000000, + block_number: Some(18000000), + dataset_version_id: None, + created_at: chrono::Utc::now(), + }]; + + let analytics = build_tvl_analytics(&snapshots); + assert_eq!(analytics.pools.len(), 1); + assert_eq!(analytics.total_tvl, Some(BigDecimal::from(4000000))); + assert_eq!(analytics.protocols.len(), 1); + assert_eq!(analytics.protocols[0].pool_count, 1); + assert_eq!( + analytics.protocols[0].total_tvl, + Some(BigDecimal::from(4000000)) + ); + } + + #[test] + fn tvl_analytics_multiple_pools() { + let snapshots = vec![ + PoolSnapshot { + id: Uuid::new_v4(), + network: "ethereum-mainnet".to_string(), + pool_address: "0xPoolA".to_string(), + protocol_address: "0xProto".to_string(), + protocol_name: Some("Uniswap".to_string()), + token0_address: "0xWETH".to_string(), + token0_symbol: Some("WETH".to_string()), + token1_address: "0xUSDC".to_string(), + token1_symbol: Some("USDC".to_string()), + reserve0: BigDecimal::from(1000), + reserve1: BigDecimal::from(2000000), + tvl_usd: Some(BigDecimal::from(4000000)), + snapshot_timestamp: 1700000000, + block_number: None, + dataset_version_id: None, + created_at: chrono::Utc::now(), + }, + PoolSnapshot { + id: Uuid::new_v4(), + network: "ethereum-mainnet".to_string(), + pool_address: "0xPoolB".to_string(), + protocol_address: "0xProto".to_string(), + protocol_name: Some("Uniswap".to_string()), + token0_address: "0xDAI".to_string(), + token0_symbol: Some("DAI".to_string()), + token1_address: "0xUSDC".to_string(), + token1_symbol: Some("USDC".to_string()), + reserve0: BigDecimal::from(500000), + reserve1: BigDecimal::from(500000), + tvl_usd: Some(BigDecimal::from(1000000)), + snapshot_timestamp: 1700000000, + block_number: None, + dataset_version_id: None, + created_at: chrono::Utc::now(), + }, + ]; + + let analytics = build_tvl_analytics(&snapshots); + assert_eq!(analytics.pools.len(), 2); + assert_eq!(analytics.total_tvl, Some(BigDecimal::from(5000000))); + assert_eq!(analytics.protocols.len(), 1); + assert_eq!(analytics.protocols[0].pool_count, 2); + assert_eq!( + analytics.protocols[0].total_tvl, + Some(BigDecimal::from(5000000)) + ); + } + + #[test] + fn tvl_analytics_no_usd_values() { + let snapshots = vec![PoolSnapshot { + id: Uuid::new_v4(), + network: "ethereum-mainnet".to_string(), + pool_address: "0xPool".to_string(), + protocol_address: "0xProto".to_string(), + protocol_name: None, + token0_address: "0xA".to_string(), + token0_symbol: None, + token1_address: "0xB".to_string(), + token1_symbol: None, + reserve0: BigDecimal::from(100), + reserve1: BigDecimal::from(200), + tvl_usd: None, + snapshot_timestamp: 1700000000, + block_number: None, + dataset_version_id: None, + created_at: chrono::Utc::now(), + }]; + + let analytics = build_tvl_analytics(&snapshots); + assert!(analytics.total_tvl.is_none()); + assert!(analytics.protocols[0].total_tvl.is_none()); + } + + #[test] + fn tvl_analytics_empty() { + let analytics = build_tvl_analytics(&[]); + assert!(analytics.pools.is_empty()); + assert!(analytics.protocols.is_empty()); + assert!(analytics.total_tvl.is_none()); + } + + #[test] + fn tvl_analytics_uses_latest_snapshot_per_pool() { + let snapshots = vec![ + PoolSnapshot { + id: Uuid::new_v4(), + network: "ethereum-mainnet".to_string(), + pool_address: "0xPool".to_string(), + protocol_address: "0xProto".to_string(), + protocol_name: None, + token0_address: "0xA".to_string(), + token0_symbol: None, + token1_address: "0xB".to_string(), + token1_symbol: None, + reserve0: BigDecimal::from(100), + reserve1: BigDecimal::from(200), + tvl_usd: Some(BigDecimal::from(1000)), + snapshot_timestamp: 1000, + block_number: None, + dataset_version_id: None, + created_at: chrono::Utc::now(), + }, + PoolSnapshot { + id: Uuid::new_v4(), + network: "ethereum-mainnet".to_string(), + pool_address: "0xPool".to_string(), + protocol_address: "0xProto".to_string(), + protocol_name: None, + token0_address: "0xA".to_string(), + token0_symbol: None, + token1_address: "0xB".to_string(), + token1_symbol: None, + reserve0: BigDecimal::from(500), + reserve1: BigDecimal::from(600), + tvl_usd: Some(BigDecimal::from(5000)), + snapshot_timestamp: 2000, + block_number: None, + dataset_version_id: None, + created_at: chrono::Utc::now(), + }, + ]; + + let analytics = build_tvl_analytics(&snapshots); + // Should pick the later snapshot (timestamp 2000) + assert_eq!(analytics.pools.len(), 1); + assert_eq!(analytics.total_tvl, Some(BigDecimal::from(5000))); + } +} diff --git a/adapters/src/v2_repo.rs b/adapters/src/v2_repo.rs index 5afcbbb..33fc4bd 100644 --- a/adapters/src/v2_repo.rs +++ b/adapters/src/v2_repo.rs @@ -6,7 +6,8 @@ use chrono::{DateTime, Utc}; use spectraplex_core::materializer::{ BalanceSnapshot, DecodedEvent, HlFillRecord, HlFundingPayment, HlPnlSummary, HlPositionChange, - HlTradeHistory, NativeBalanceDelta, TokenTransfer, WalletLedgerRecord, + HlTradeHistory, NativeBalanceDelta, PoolSnapshot, ProtocolEvent, TokenTransfer, + WalletLedgerRecord, }; use spectraplex_core::v2::{ ChainFamily, Checkpoint, CompletenessStatus, DatasetCompleteness, DatasetVersion, @@ -983,6 +984,43 @@ fn row_to_hl_trade_history(row: &sqlx::postgres::PgRow) -> anyhow::Result anyhow::Result { + Ok(ProtocolEvent { + id: row.try_get("id")?, + network: row.try_get("network")?, + protocol_address: row.try_get("protocol_address")?, + protocol_name: row.try_get("protocol_name")?, + event_type: row.try_get("event_type")?, + event_details: row.try_get("event_details")?, + pool_address: row.try_get("pool_address")?, + raw_event_id: row.try_get("raw_event_id")?, + timestamp: row.try_get("timestamp")?, + dataset_version_id: row.try_get("dataset_version_id")?, + created_at: row.try_get("created_at")?, + }) +} + +fn row_to_pool_snapshot(row: &sqlx::postgres::PgRow) -> anyhow::Result { + Ok(PoolSnapshot { + id: row.try_get("id")?, + network: row.try_get("network")?, + pool_address: row.try_get("pool_address")?, + protocol_address: row.try_get("protocol_address")?, + protocol_name: row.try_get("protocol_name")?, + token0_address: row.try_get("token0_address")?, + token0_symbol: row.try_get("token0_symbol")?, + token1_address: row.try_get("token1_address")?, + token1_symbol: row.try_get("token1_symbol")?, + reserve0: row.try_get("reserve0")?, + reserve1: row.try_get("reserve1")?, + tvl_usd: row.try_get("tvl_usd")?, + snapshot_timestamp: row.try_get("snapshot_timestamp")?, + block_number: row.try_get("block_number")?, + dataset_version_id: row.try_get("dataset_version_id")?, + created_at: row.try_get("created_at")?, + }) +} + // --------------------------------------------------------------------------- // Query builders for dataset_completeness (P3-W6) // --------------------------------------------------------------------------- @@ -2568,6 +2606,99 @@ impl Repository { ) .await } + + // -- P5-W3: Protocol / TVL Gold dataset query/export methods -- + + pub async fn query_protocol_events( + &self, + target_id: Option, + network: Option<&str>, + time_start: Option, + time_end: Option, + limit: i64, + offset: i64, + ) -> anyhow::Result> { + let cols = "dt.id, dt.network, dt.protocol_address, dt.protocol_name, dt.event_type, \ + dt.event_details, dt.pool_address, dt.raw_event_id, dt.timestamp, \ + dt.dataset_version_id, dt.created_at"; + let (sql, args) = build_dataset_filter_query( + cols, + "protocol_events", + "dt.timestamp", + target_id, + network, + time_start, + time_end, + limit, + offset, + )?; + let rows = sqlx::query_with(&sql, args).fetch_all(self.pool()).await?; + rows.iter().map(row_to_protocol_event).collect() + } + + pub async fn export_protocol_events( + &self, + target_id: Option, + network: Option<&str>, + time_start: Option, + time_end: Option, + ) -> anyhow::Result> { + self.query_protocol_events( + target_id, + network, + time_start, + time_end, + Self::EXPORT_MAX_RECORDS, + 0, + ) + .await + } + + pub async fn query_pool_snapshots( + &self, + target_id: Option, + network: Option<&str>, + time_start: Option, + time_end: Option, + limit: i64, + offset: i64, + ) -> anyhow::Result> { + let cols = "dt.id, dt.network, dt.pool_address, dt.protocol_address, dt.protocol_name, \ + dt.token0_address, dt.token0_symbol, dt.token1_address, dt.token1_symbol, \ + dt.reserve0, dt.reserve1, dt.tvl_usd, dt.snapshot_timestamp, dt.block_number, \ + dt.dataset_version_id, dt.created_at"; + let (sql, args) = build_dataset_filter_query( + cols, + "pool_snapshots", + "dt.snapshot_timestamp", + target_id, + network, + time_start, + time_end, + limit, + offset, + )?; + let rows = sqlx::query_with(&sql, args).fetch_all(self.pool()).await?; + rows.iter().map(row_to_pool_snapshot).collect() + } + + pub async fn export_pool_snapshots( + &self, + target_id: Option, + network: Option<&str>, + time_start: Option, + time_end: Option, + ) -> anyhow::Result> { + self.query_pool_snapshots( + target_id, + network, + time_start, + time_end, + Self::EXPORT_MAX_RECORDS, + 0, + ) + .await + } } // --------------------------------------------------------------------------- @@ -3763,4 +3894,96 @@ mod tests { } let _ = _check; } + + // -- P5-W3: protocol_events and pool_snapshots query builders -- + + #[test] + fn protocol_events_query_builder_basic() { + let (sql, _) = build_dataset_filter_query( + "dt.*", + "protocol_events", + "dt.timestamp", + None, + None, + None, + None, + 50, + 0, + ) + .unwrap(); + assert!(sql.contains("protocol_events")); + assert!(sql.contains("LIMIT")); + } + + #[test] + fn protocol_events_query_builder_with_network() { + let (sql, _) = build_dataset_filter_query( + "dt.*", + "protocol_events", + "dt.timestamp", + None, + Some("ethereum-mainnet"), + None, + None, + 50, + 0, + ) + .unwrap(); + assert!(sql.contains("dt.network")); + } + + #[test] + fn pool_snapshots_query_builder_basic() { + let (sql, _) = build_dataset_filter_query( + "dt.*", + "pool_snapshots", + "dt.snapshot_timestamp", + None, + None, + None, + None, + 50, + 0, + ) + .unwrap(); + assert!(sql.contains("pool_snapshots")); + assert!(sql.contains("LIMIT")); + } + + #[test] + fn pool_snapshots_query_builder_with_time_range() { + let (sql, _) = build_dataset_filter_query( + "dt.*", + "pool_snapshots", + "dt.snapshot_timestamp", + None, + None, + Some(1000), + Some(2000), + 50, + 0, + ) + .unwrap(); + // Time filtering joins raw_transactions and uses rt.timestamp + assert!(sql.contains("rt.timestamp >=")); + assert!(sql.contains("rt.timestamp <=")); + } + + #[test] + fn repo_query_protocol_events_is_send() { + fn _assert_send(_: F) {} + fn _check(repo: &Repository) { + _assert_send(repo.query_protocol_events(None, None, None, None, 50, 0)); + } + let _ = _check; + } + + #[test] + fn repo_query_pool_snapshots_is_send() { + fn _assert_send(_: F) {} + fn _check(repo: &Repository) { + _assert_send(repo.query_pool_snapshots(None, None, None, None, 50, 0)); + } + let _ = _check; + } } diff --git a/api/src/main.rs b/api/src/main.rs index 7292dca..2aeb7c7 100644 --- a/api/src/main.rs +++ b/api/src/main.rs @@ -22,8 +22,9 @@ use spectraplex_core::config::AppConfig; use spectraplex_core::connector::validate_target; use spectraplex_core::materializer::{ BalanceSnapshot, DatasetName, DeliveryMetadata, DeliveryReceipt, ExportFormat, ExportSink, - ForensicsActivity, HlPnlSummary, HlTradeHistory, MarketAnalytics, SinkConfig, SinkType, - TraderAnalytics, WalletLedgerRecord, + ForensicsActivity, HlPnlSummary, HlTradeHistory, MarketAnalytics, PoolSnapshot, + ProtocolActivity, ProtocolEvent, SinkConfig, SinkType, TraderAnalytics, TvlAnalytics, + WalletLedgerRecord, }; use spectraplex_core::models::{Chain, ChainIngestor, IndexerCheckpoint, LedgerEntry, Transaction}; use spectraplex_core::v2::{ @@ -308,6 +309,11 @@ async fn main() -> anyhow::Result<()> { .route("/v1/forensics/activity", get(forensics_activity_handler)) .route("/v1/analytics/hl/trader", get(hl_trader_analytics_handler)) .route("/v1/analytics/hl/market", get(hl_market_analytics_handler)) + .route( + "/v1/analytics/protocol/activity", + get(protocol_activity_handler), + ) + .route("/v1/analytics/protocol/tvl", get(protocol_tvl_handler)) .layer(middleware::from_fn_with_state( Arc::clone(&shared_state), require_auth, @@ -1589,6 +1595,8 @@ const QUERYABLE_DATASETS: &[&str] = &[ "balance_history", "hl_pnl_summary", "hl_trade_history", + "protocol_events", + "pool_snapshots", ]; fn validate_dataset_name(name: &str) -> Result<(), AppError> { @@ -1805,6 +1813,36 @@ async fn query_dataset_records( .map_err(AppError::internal)?; serde_json::to_value(&records).map_err(AppError::internal)? } + "protocol_events" => { + let records = state + .repo + .query_protocol_events( + params.target_id, + net, + params.time_start, + params.time_end, + limit, + offset, + ) + .await + .map_err(AppError::internal)?; + serde_json::to_value(&records).map_err(AppError::internal)? + } + "pool_snapshots" => { + let records = state + .repo + .query_pool_snapshots( + params.target_id, + net, + params.time_start, + params.time_end, + limit, + offset, + ) + .await + .map_err(AppError::internal)?; + serde_json::to_value(&records).map_err(AppError::internal)? + } _ => unreachable!("dataset name validated above"), }; @@ -1828,6 +1866,8 @@ const EXPORTABLE_DATASETS: &[&str] = &[ "balance_history", "hl_pnl_summary", "hl_trade_history", + "protocol_events", + "pool_snapshots", ]; fn content_type_for_format(format: ExportFormat) -> &'static str { @@ -2141,6 +2181,64 @@ fn hl_trade_history_to_csv(records: &[HlTradeHistory]) -> Vec { buf.into_bytes() } +fn protocol_events_to_csv(records: &[ProtocolEvent]) -> Vec { + let mut buf = String::from( + "id,network,protocol_address,protocol_name,event_type,event_details,pool_address,raw_event_id,timestamp,dataset_version_id,created_at\n", + ); + for r in records { + buf.push_str(&format!( + "{},{},{},{},{},{},{},{},{},{},{}\n", + r.id, + csv_escape(&r.network), + csv_escape(&r.protocol_address), + r.protocol_name.as_deref().unwrap_or(""), + csv_escape(&r.event_type), + csv_escape(&r.event_details.to_string()), + r.pool_address.as_deref().unwrap_or(""), + r.raw_event_id.map(|u| u.to_string()).unwrap_or_default(), + r.timestamp, + r.dataset_version_id + .map(|u| u.to_string()) + .unwrap_or_default(), + r.created_at.to_rfc3339(), + )); + } + buf.into_bytes() +} + +fn pool_snapshots_to_csv(records: &[PoolSnapshot]) -> Vec { + let mut buf = String::from( + "id,network,pool_address,protocol_address,protocol_name,token0_address,token0_symbol,token1_address,token1_symbol,reserve0,reserve1,tvl_usd,snapshot_timestamp,block_number,dataset_version_id,created_at\n", + ); + for r in records { + buf.push_str(&format!( + "{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}\n", + r.id, + csv_escape(&r.network), + csv_escape(&r.pool_address), + csv_escape(&r.protocol_address), + r.protocol_name.as_deref().unwrap_or(""), + csv_escape(&r.token0_address), + r.token0_symbol.as_deref().unwrap_or(""), + csv_escape(&r.token1_address), + r.token1_symbol.as_deref().unwrap_or(""), + r.reserve0, + r.reserve1, + r.tvl_usd + .as_ref() + .map(|v| v.to_string()) + .unwrap_or_default(), + r.snapshot_timestamp, + r.block_number.map(|v| v.to_string()).unwrap_or_default(), + r.dataset_version_id + .map(|u| u.to_string()) + .unwrap_or_default(), + r.created_at.to_rfc3339(), + )); + } + buf.into_bytes() +} + /// Generate tax-export-friendly CSV from wallet_ledger records. /// /// Columns: Date,Type,Sent_Asset,Sent_Amount,Received_Asset,Received_Amount, @@ -2638,6 +2736,32 @@ async fn run_export_job( }; Ok((body, count, meta)) } + "protocol_events" => { + let records = repo + .export_protocol_events(target_id, network, time_start, time_end) + .await?; + let count = records.len(); + let body = match format { + ExportFormat::Jsonl => { + serialize_to_jsonl(&records).map_err(|e| anyhow::anyhow!("{}", e.message))? + } + ExportFormat::Csv => protocol_events_to_csv(&records), + }; + Ok((body, count, meta)) + } + "pool_snapshots" => { + let records = repo + .export_pool_snapshots(target_id, network, time_start, time_end) + .await?; + let count = records.len(); + let body = match format { + ExportFormat::Jsonl => { + serialize_to_jsonl(&records).map_err(|e| anyhow::anyhow!("{}", e.message))? + } + ExportFormat::Csv => pool_snapshots_to_csv(&records), + }; + Ok((body, count, meta)) + } _ => Err(anyhow::anyhow!("Unknown dataset: {dataset}")), } } @@ -2890,6 +3014,74 @@ async fn hl_market_analytics_handler( Ok(Json(analytics)) } +// --------------------------------------------------------------------------- +// Protocol analytics endpoints (P5-W3) +// --------------------------------------------------------------------------- + +/// Query parameters for the protocol analytics endpoints. +#[derive(Debug, Deserialize)] +struct ProtocolAnalyticsParams { + target_id: Option, + network: Option, + time_start: Option, + time_end: Option, + protocol_address: Option, +} + +/// GET /v1/analytics/protocol/activity — per-protocol event activity analytics. +async fn protocol_activity_handler( + State(state): State>, + Query(params): Query, +) -> Result, AppError> { + validate_date_range(params.time_start, params.time_end)?; + + let events = state + .repo + .export_protocol_events( + params.target_id, + params.network.as_deref(), + params.time_start, + params.time_end, + ) + .await + .map_err(AppError::internal)?; + + let protocol_addr = params.protocol_address.as_deref().unwrap_or_else(|| { + events + .first() + .map(|e| e.protocol_address.as_str()) + .unwrap_or("") + }); + + let activity = + spectraplex_adapters::protocol_analytics::build_protocol_activity(protocol_addr, &events); + + Ok(Json(activity)) +} + +/// GET /v1/analytics/protocol/tvl — TVL analytics from pool snapshots. +async fn protocol_tvl_handler( + State(state): State>, + Query(params): Query, +) -> Result, AppError> { + validate_date_range(params.time_start, params.time_end)?; + + let snapshots = state + .repo + .export_pool_snapshots( + params.target_id, + params.network.as_deref(), + params.time_start, + params.time_end, + ) + .await + .map_err(AppError::internal)?; + + let analytics = spectraplex_adapters::protocol_analytics::build_tvl_analytics(&snapshots); + + Ok(Json(analytics)) +} + async fn get_dataset_completeness_handler( State(state): State>, Path(name): Path, @@ -3085,6 +3277,11 @@ mod tests { .route("/v1/forensics/activity", get(forensics_activity_handler)) .route("/v1/analytics/hl/trader", get(hl_trader_analytics_handler)) .route("/v1/analytics/hl/market", get(hl_market_analytics_handler)) + .route( + "/v1/analytics/protocol/activity", + get(protocol_activity_handler), + ) + .route("/v1/analytics/protocol/tvl", get(protocol_tvl_handler)) .layer(middleware::from_fn_with_state( Arc::clone(&state), require_auth, @@ -5075,7 +5272,7 @@ mod tests { #[test] fn test_queryable_datasets_count() { - assert_eq!(QUERYABLE_DATASETS.len(), 10); + assert_eq!(QUERYABLE_DATASETS.len(), 12); } #[test] @@ -5090,6 +5287,8 @@ mod tests { assert!(QUERYABLE_DATASETS.contains(&"balance_history")); assert!(QUERYABLE_DATASETS.contains(&"hl_pnl_summary")); assert!(QUERYABLE_DATASETS.contains(&"hl_trade_history")); + assert!(QUERYABLE_DATASETS.contains(&"protocol_events")); + assert!(QUERYABLE_DATASETS.contains(&"pool_snapshots")); } #[test] @@ -6921,11 +7120,13 @@ mod tests { #[test] fn test_exportable_datasets_includes_gold() { - assert_eq!(EXPORTABLE_DATASETS.len(), 10); + assert_eq!(EXPORTABLE_DATASETS.len(), 12); assert!(EXPORTABLE_DATASETS.contains(&"wallet_ledger")); assert!(EXPORTABLE_DATASETS.contains(&"balance_history")); assert!(EXPORTABLE_DATASETS.contains(&"hl_pnl_summary")); assert!(EXPORTABLE_DATASETS.contains(&"hl_trade_history")); + assert!(EXPORTABLE_DATASETS.contains(&"protocol_events")); + assert!(EXPORTABLE_DATASETS.contains(&"pool_snapshots")); } #[tokio::test] @@ -7205,4 +7406,191 @@ mod tests { assert_get_routed(test_router(), uri).await; } } + + // -- P5-W3: Protocol / TVL Pack tests -- + + #[test] + fn test_protocol_events_in_queryable_datasets() { + assert!(QUERYABLE_DATASETS.contains(&"protocol_events")); + } + + #[test] + fn test_pool_snapshots_in_queryable_datasets() { + assert!(QUERYABLE_DATASETS.contains(&"pool_snapshots")); + } + + #[test] + fn test_protocol_events_in_exportable_datasets() { + assert!(EXPORTABLE_DATASETS.contains(&"protocol_events")); + } + + #[test] + fn test_pool_snapshots_in_exportable_datasets() { + assert!(EXPORTABLE_DATASETS.contains(&"pool_snapshots")); + } + + #[tokio::test] + async fn test_protocol_events_records_endpoint_routed() { + assert_get_routed(test_router(), "/v1/datasets/protocol_events/records").await; + } + + #[tokio::test] + async fn test_pool_snapshots_records_endpoint_routed() { + assert_get_routed(test_router(), "/v1/datasets/pool_snapshots/records").await; + } + + #[tokio::test] + async fn test_protocol_activity_endpoint_routed() { + assert_get_routed(test_router(), "/v1/analytics/protocol/activity").await; + } + + #[tokio::test] + async fn test_protocol_tvl_endpoint_routed() { + assert_get_routed(test_router(), "/v1/analytics/protocol/tvl").await; + } + + #[tokio::test] + async fn test_protocol_activity_requires_auth() { + assert_get_requires_auth(test_router(), "/v1/analytics/protocol/activity").await; + } + + #[tokio::test] + async fn test_protocol_tvl_requires_auth() { + assert_get_requires_auth(test_router(), "/v1/analytics/protocol/tvl").await; + } + + #[tokio::test] + async fn test_protocol_events_export_accepted() { + let app = test_router(); + let req = axum::http::Request::builder() + .method("POST") + .uri("/v1/export/dataset") + .header("content-type", "application/json") + .header("authorization", format!("Bearer {}", TEST_API_KEY)) + .body(Body::from( + serde_json::to_string(&serde_json::json!({ + "dataset": "protocol_events" + })) + .unwrap(), + )) + .unwrap(); + let response = app.oneshot(req).await.unwrap(); + assert_eq!( + response.status(), + StatusCode::OK, + "protocol_events export should be accepted" + ); + } + + #[tokio::test] + async fn test_pool_snapshots_export_accepted() { + let app = test_router(); + let req = axum::http::Request::builder() + .method("POST") + .uri("/v1/export/dataset") + .header("content-type", "application/json") + .header("authorization", format!("Bearer {}", TEST_API_KEY)) + .body(Body::from( + serde_json::to_string(&serde_json::json!({ + "dataset": "pool_snapshots" + })) + .unwrap(), + )) + .unwrap(); + let response = app.oneshot(req).await.unwrap(); + assert_eq!( + response.status(), + StatusCode::OK, + "pool_snapshots export should be accepted" + ); + } + + #[test] + fn test_protocol_events_to_csv_format() { + let records = vec![ProtocolEvent { + id: Uuid::nil(), + network: "ethereum-mainnet".to_string(), + protocol_address: "0xUniswap".to_string(), + protocol_name: Some("Uniswap V3".to_string()), + event_type: "swap".to_string(), + event_details: serde_json::json!({"amount0": "100"}), + pool_address: Some("0xPool".to_string()), + raw_event_id: None, + timestamp: 1700000000, + dataset_version_id: None, + created_at: chrono::DateTime::from_timestamp(1700000000, 0).unwrap(), + }]; + let csv = protocol_events_to_csv(&records); + let output = String::from_utf8(csv).unwrap(); + let lines: Vec<&str> = output.lines().collect(); + assert_eq!(lines.len(), 2); // header + 1 row + assert!(lines[0].contains("protocol_address")); + assert!(lines[0].contains("event_type")); + assert!(lines[0].contains("event_details")); + assert!(lines[1].contains("0xUniswap")); + assert!(lines[1].contains("swap")); + } + + #[test] + fn test_pool_snapshots_to_csv_format() { + let records = vec![PoolSnapshot { + id: Uuid::nil(), + network: "ethereum-mainnet".to_string(), + pool_address: "0xPool".to_string(), + protocol_address: "0xProto".to_string(), + protocol_name: Some("Uniswap".to_string()), + token0_address: "0xWETH".to_string(), + token0_symbol: Some("WETH".to_string()), + token1_address: "0xUSDC".to_string(), + token1_symbol: Some("USDC".to_string()), + reserve0: bigdecimal::BigDecimal::from(1000), + reserve1: bigdecimal::BigDecimal::from(2000000), + tvl_usd: Some(bigdecimal::BigDecimal::from(4000000)), + snapshot_timestamp: 1700000000, + block_number: Some(18000000), + dataset_version_id: None, + created_at: chrono::DateTime::from_timestamp(1700000000, 0).unwrap(), + }]; + let csv = pool_snapshots_to_csv(&records); + let output = String::from_utf8(csv).unwrap(); + let lines: Vec<&str> = output.lines().collect(); + assert_eq!(lines.len(), 2); // header + 1 row + assert!(lines[0].contains("pool_address")); + assert!(lines[0].contains("reserve0")); + assert!(lines[0].contains("tvl_usd")); + assert!(lines[1].contains("0xPool")); + assert!(lines[1].contains("4000000")); + } + + // Verify P5-W2 and P5-W1 compatibility remains + #[tokio::test] + async fn p5w3_compat_existing_wallet_endpoints_still_routed() { + let wallet_uris = vec![ + "/v1/transactions/SomeWallet123", + "/v1/ledger/SomeWallet123", + "/v1/export/SomeWallet123", + "/v1/balances/SomeWallet123", + "/v1/stats/SomeWallet123", + ]; + for uri in wallet_uris { + assert_get_routed(test_router(), uri).await; + } + } + + #[tokio::test] + async fn p5w3_compat_p5w1_and_p5w2_endpoints_still_routed() { + let uris = vec![ + "/v1/datasets/wallet_ledger/records", + "/v1/datasets/balance_history/records", + "/v1/export/tax", + "/v1/forensics/activity", + "/v1/analytics/hl/trader", + "/v1/analytics/hl/market", + "/v1/datasets/hl_pnl_summary/records", + "/v1/datasets/hl_trade_history/records", + ]; + for uri in uris { + assert_get_routed(test_router(), uri).await; + } + } } diff --git a/core/src/materializer.rs b/core/src/materializer.rs index cc09092..97d505e 100644 --- a/core/src/materializer.rs +++ b/core/src/materializer.rs @@ -50,6 +50,10 @@ pub enum DatasetName { HlPnlSummary, /// Gold-tier Hyperliquid trade history with entry/exit grouping (P5-W2). HlTradeHistory, + /// Gold-tier protocol event records derived from decoded_events (P5-W3). + ProtocolEvents, + /// Gold-tier pool snapshot records derived from decoded_events + token_transfers (P5-W3). + PoolSnapshots, } impl DatasetName { @@ -67,6 +71,8 @@ impl DatasetName { DatasetName::BalanceHistory => "balance_history", DatasetName::HlPnlSummary => "hl_pnl_summary", DatasetName::HlTradeHistory => "hl_trade_history", + DatasetName::ProtocolEvents => "protocol_events", + DatasetName::PoolSnapshots => "pool_snapshots", } } @@ -84,6 +90,8 @@ impl DatasetName { DatasetName::BalanceHistory, DatasetName::HlPnlSummary, DatasetName::HlTradeHistory, + DatasetName::ProtocolEvents, + DatasetName::PoolSnapshots, ] } } @@ -742,6 +750,138 @@ pub struct CoinMarketSummary { pub avg_trade_size: BigDecimal, } +// --------------------------------------------------------------------------- +// Gold Dataset Records (P5-W3): Protocol / TVL +// --------------------------------------------------------------------------- + +/// Gold-tier protocol event record. +/// +/// Derived from Silver `decoded_events` by grouping events by their +/// `program_or_contract` as the protocol_address. Each record represents +/// a significant protocol-level event (swap, mint, burn, liquidity change). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ProtocolEvent { + pub id: Uuid, + pub network: String, + /// Contract or program address acting as the protocol identifier. + pub protocol_address: String, + /// Human-readable protocol name (if resolvable). + pub protocol_name: Option, + /// Event classification: "swap", "mint", "burn", "liquidity_added", + /// "liquidity_removed", "transfer", "other". + pub event_type: String, + /// Structured event details (decoded fields snapshot). + pub event_details: serde_json::Value, + /// Pool or pair address involved in the event (if applicable). + pub pool_address: Option, + /// FK to the source decoded_events record. + pub raw_event_id: Option, + /// Unix timestamp of the event. + pub timestamp: i64, + /// FK to dataset_versions. + pub dataset_version_id: Option, + pub created_at: DateTime, +} + +/// Gold-tier pool snapshot record. +/// +/// Derived from Silver `decoded_events` (swap / liquidity events) and +/// `token_transfers` to capture per-pool reserve state at a point in time. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PoolSnapshot { + pub id: Uuid, + pub network: String, + /// Pool or pair contract address. + pub pool_address: String, + /// Protocol address the pool belongs to. + pub protocol_address: String, + /// Human-readable protocol name (if resolvable). + pub protocol_name: Option, + /// Address of the first token in the pair. + pub token0_address: String, + /// Symbol of the first token. + pub token0_symbol: Option, + /// Address of the second token in the pair. + pub token1_address: String, + /// Symbol of the second token. + pub token1_symbol: Option, + /// Reserve amount for token0. + pub reserve0: BigDecimal, + /// Reserve amount for token1. + pub reserve1: BigDecimal, + /// USD-denominated total value locked (nullable — requires price feed). + pub tvl_usd: Option, + /// Unix timestamp of the snapshot. + pub snapshot_timestamp: i64, + /// Block number at snapshot time. + pub block_number: Option, + /// FK to dataset_versions. + pub dataset_version_id: Option, + pub created_at: DateTime, +} + +/// Per-protocol activity analytics response. +/// +/// Aggregated view of protocol interactions — event counts by type, +/// unique participant count, and time range. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ProtocolActivity { + /// Protocol contract or program address. + pub protocol_address: String, + /// Event counts grouped by event_type. + pub event_counts_by_type: Vec, + /// Number of unique participant addresses. + pub unique_participants: usize, + /// Total number of protocol events. + pub total_events: usize, + /// Earliest event timestamp. + pub time_start: Option, + /// Latest event timestamp. + pub time_end: Option, +} + +/// Event count for a single event type. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EventTypeCount { + pub event_type: String, + pub count: usize, +} + +/// TVL analytics response. +/// +/// Per-pool and aggregate TVL computed from pool_snapshots. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TvlAnalytics { + /// Per-pool TVL snapshots. + pub pools: Vec, + /// Aggregate TVL across all pools (sum of tvl_usd where available). + pub total_tvl: Option, + /// Protocol-level aggregation (grouped by protocol_address). + pub protocols: Vec, +} + +/// TVL summary for a single pool. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PoolTvlSummary { + pub pool_address: String, + pub protocol_address: String, + pub token0_symbol: Option, + pub token1_symbol: Option, + pub reserve0: BigDecimal, + pub reserve1: BigDecimal, + pub tvl_usd: Option, + pub snapshot_timestamp: i64, +} + +/// TVL summary for a protocol (sum across its pools). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ProtocolTvlSummary { + pub protocol_address: String, + pub protocol_name: Option, + pub pool_count: usize, + pub total_tvl: Option, +} + // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- @@ -753,9 +893,9 @@ mod tests { use strum::IntoEnumIterator; #[test] - fn dataset_name_count_is_eleven() { - assert_eq!(DatasetName::all().len(), 11); - assert_eq!(DatasetName::iter().count(), 11); + fn dataset_name_count_is_thirteen() { + assert_eq!(DatasetName::all().len(), 13); + assert_eq!(DatasetName::iter().count(), 13); } #[test] @@ -775,8 +915,10 @@ mod tests { (DatasetName::BalanceHistory, "\"balance_history\""), (DatasetName::HlPnlSummary, "\"hl_pnl_summary\""), (DatasetName::HlTradeHistory, "\"hl_trade_history\""), + (DatasetName::ProtocolEvents, "\"protocol_events\""), + (DatasetName::PoolSnapshots, "\"pool_snapshots\""), ]; - assert_eq!(cases.len(), 11, "must cover all 11 datasets"); + assert_eq!(cases.len(), 13, "must cover all 13 datasets"); for (variant, expected_json) in cases { let json = serde_json::to_string(&variant).unwrap(); assert_eq!(json, expected_json, "serialize {variant:?}"); @@ -801,6 +943,8 @@ mod tests { assert_eq!(DatasetName::BalanceHistory.to_string(), "balance_history"); assert_eq!(DatasetName::HlPnlSummary.to_string(), "hl_pnl_summary"); assert_eq!(DatasetName::HlTradeHistory.to_string(), "hl_trade_history"); + assert_eq!(DatasetName::ProtocolEvents.to_string(), "protocol_events"); + assert_eq!(DatasetName::PoolSnapshots.to_string(), "pool_snapshots"); } #[test] @@ -833,6 +977,14 @@ mod tests { DatasetName::from_str("hl_trade_history").unwrap(), DatasetName::HlTradeHistory ); + assert_eq!( + DatasetName::from_str("protocol_events").unwrap(), + DatasetName::ProtocolEvents + ); + assert_eq!( + DatasetName::from_str("pool_snapshots").unwrap(), + DatasetName::PoolSnapshots + ); assert!(DatasetName::from_str("unknown_dataset").is_err()); } @@ -1765,4 +1917,110 @@ mod tests { assert_eq!(back.type_breakdown.len(), 1); assert_eq!(back.total_entries, 10); } + + // -- P5-W3: Protocol / TVL record and analytics serde tests -- + + #[test] + fn protocol_event_serde_roundtrip() { + let pe = ProtocolEvent { + id: Uuid::nil(), + network: "ethereum-mainnet".to_string(), + protocol_address: "0xUniswapV3".to_string(), + protocol_name: Some("Uniswap V3".to_string()), + event_type: "swap".to_string(), + event_details: serde_json::json!({"amount0": "100", "amount1": "-50"}), + pool_address: Some("0xPool123".to_string()), + raw_event_id: Some(Uuid::nil()), + timestamp: 1700000000, + dataset_version_id: None, + created_at: chrono::DateTime::from_timestamp(1700000000, 0).unwrap(), + }; + let json = serde_json::to_string(&pe).unwrap(); + let back: ProtocolEvent = serde_json::from_str(&json).unwrap(); + assert_eq!(back.protocol_address, "0xUniswapV3"); + assert_eq!(back.event_type, "swap"); + assert_eq!(back.pool_address.as_deref(), Some("0xPool123")); + } + + #[test] + fn pool_snapshot_serde_roundtrip() { + let ps = PoolSnapshot { + id: Uuid::nil(), + network: "ethereum-mainnet".to_string(), + pool_address: "0xPool123".to_string(), + protocol_address: "0xUniswapV3".to_string(), + protocol_name: Some("Uniswap V3".to_string()), + token0_address: "0xTokenA".to_string(), + token0_symbol: Some("WETH".to_string()), + token1_address: "0xTokenB".to_string(), + token1_symbol: Some("USDC".to_string()), + reserve0: BigDecimal::from_str("1000.5").unwrap(), + reserve1: BigDecimal::from_str("2000000").unwrap(), + tvl_usd: Some(BigDecimal::from(4000000)), + snapshot_timestamp: 1700000000, + block_number: Some(18000000), + dataset_version_id: None, + created_at: chrono::DateTime::from_timestamp(1700000000, 0).unwrap(), + }; + let json = serde_json::to_string(&ps).unwrap(); + let back: PoolSnapshot = serde_json::from_str(&json).unwrap(); + assert_eq!(back.pool_address, "0xPool123"); + assert_eq!(back.reserve0, BigDecimal::from_str("1000.5").unwrap()); + assert_eq!(back.tvl_usd, Some(BigDecimal::from(4000000))); + } + + #[test] + fn protocol_activity_serde_roundtrip() { + let pa = ProtocolActivity { + protocol_address: "0xUniswapV3".to_string(), + event_counts_by_type: vec![ + EventTypeCount { + event_type: "swap".to_string(), + count: 100, + }, + EventTypeCount { + event_type: "mint".to_string(), + count: 10, + }, + ], + unique_participants: 50, + total_events: 110, + time_start: Some(1700000000), + time_end: Some(1700100000), + }; + let json = serde_json::to_string(&pa).unwrap(); + let back: ProtocolActivity = serde_json::from_str(&json).unwrap(); + assert_eq!(back.protocol_address, "0xUniswapV3"); + assert_eq!(back.event_counts_by_type.len(), 2); + assert_eq!(back.unique_participants, 50); + assert_eq!(back.total_events, 110); + } + + #[test] + fn tvl_analytics_serde_roundtrip() { + let tvl = TvlAnalytics { + pools: vec![PoolTvlSummary { + pool_address: "0xPool".to_string(), + protocol_address: "0xProto".to_string(), + token0_symbol: Some("WETH".to_string()), + token1_symbol: Some("USDC".to_string()), + reserve0: BigDecimal::from(1000), + reserve1: BigDecimal::from(2000000), + tvl_usd: Some(BigDecimal::from(4000000)), + snapshot_timestamp: 1700000000, + }], + total_tvl: Some(BigDecimal::from(4000000)), + protocols: vec![ProtocolTvlSummary { + protocol_address: "0xProto".to_string(), + protocol_name: Some("Uniswap".to_string()), + pool_count: 1, + total_tvl: Some(BigDecimal::from(4000000)), + }], + }; + let json = serde_json::to_string(&tvl).unwrap(); + let back: TvlAnalytics = serde_json::from_str(&json).unwrap(); + assert_eq!(back.pools.len(), 1); + assert_eq!(back.protocols.len(), 1); + assert_eq!(back.total_tvl, Some(BigDecimal::from(4000000))); + } } diff --git a/migrations/20260311000000_add_protocol_tvl_gold.sql b/migrations/20260311000000_add_protocol_tvl_gold.sql new file mode 100644 index 0000000..5c54ded --- /dev/null +++ b/migrations/20260311000000_add_protocol_tvl_gold.sql @@ -0,0 +1,56 @@ +-- P5-W3: Gold-tier protocol analytics and TVL tables +-- protocol_events: protocol-level events derived from Silver decoded_events +-- pool_snapshots: per-pool reserve snapshots derived from decoded_events + token_transfers + +CREATE TABLE IF NOT EXISTS protocol_events ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + network TEXT NOT NULL, + protocol_address TEXT NOT NULL, + protocol_name TEXT, + event_type TEXT NOT NULL, + event_details JSONB NOT NULL DEFAULT '{}', + pool_address TEXT, + raw_event_id UUID REFERENCES decoded_events(id), + timestamp BIGINT NOT NULL, + dataset_version_id UUID REFERENCES dataset_versions(id), + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS idx_protocol_events_protocol_address + ON protocol_events (protocol_address); +CREATE INDEX IF NOT EXISTS idx_protocol_events_pool_address + ON protocol_events (pool_address); +CREATE INDEX IF NOT EXISTS idx_protocol_events_network + ON protocol_events (network); +CREATE INDEX IF NOT EXISTS idx_protocol_events_timestamp + ON protocol_events (timestamp DESC); +CREATE INDEX IF NOT EXISTS idx_protocol_events_event_type + ON protocol_events (event_type); + +CREATE TABLE IF NOT EXISTS pool_snapshots ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + network TEXT NOT NULL, + pool_address TEXT NOT NULL, + protocol_address TEXT NOT NULL, + protocol_name TEXT, + token0_address TEXT NOT NULL, + token0_symbol TEXT, + token1_address TEXT NOT NULL, + token1_symbol TEXT, + reserve0 NUMERIC NOT NULL DEFAULT 0, + reserve1 NUMERIC NOT NULL DEFAULT 0, + tvl_usd NUMERIC, + snapshot_timestamp BIGINT NOT NULL, + block_number BIGINT, + dataset_version_id UUID REFERENCES dataset_versions(id), + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS idx_pool_snapshots_pool_address + ON pool_snapshots (pool_address); +CREATE INDEX IF NOT EXISTS idx_pool_snapshots_protocol_address + ON pool_snapshots (protocol_address); +CREATE INDEX IF NOT EXISTS idx_pool_snapshots_network + ON pool_snapshots (network); +CREATE INDEX IF NOT EXISTS idx_pool_snapshots_snapshot_timestamp + ON pool_snapshots (snapshot_timestamp DESC);