diff --git a/Cargo.lock b/Cargo.lock index 762b58f..08d2732 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7043,6 +7043,7 @@ name = "spectraplex-api" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "axum", "bigdecimal", "chrono", diff --git a/README.md b/README.md index f50796c..1a82952 100644 --- a/README.md +++ b/README.md @@ -434,6 +434,58 @@ curl -X POST http://127.0.0.1:3000/v1/export/dataset \ Creates an async export job for a Silver dataset. Supported formats: `jsonl` (default), `csv`. Optional filters: `target_id`, `network`, `time_start`, `time_end`. Exportable datasets: `token_transfers`, `native_balance_deltas`, `decoded_events`, `hl_fills`, `hl_funding`, `positions`. Maximum 100,000 records per export. +#### Sink delivery + +Export jobs support an optional `sink` field to deliver completed export data to an external destination. When a sink is configured, data is delivered to the sink **and** stored in-memory for download via the existing `/v1/export/jobs/:job_id/download` endpoint (backward compatible). + +Supported sink types: + +| Sink type | Description | Required fields | +|-----------|-------------|-----------------| +| `local_file` | Write export data to a local file path | `file_path` | +| `webhook` | POST export data to an HTTP(S) URL | `url`, optional `headers` | +| `database` | Deliver to an external database (not yet implemented) | `connection_string`, `table` | + +Example with local file sink: + +```bash +curl -X POST http://127.0.0.1:3000/v1/export/dataset \ + -H "Authorization: Bearer " \ + -H "Content-Type: application/json" \ + -d '{ + "dataset": "token_transfers", + "format": "jsonl", + "sink": { + "sink_type": "local_file", + "file_path": "/tmp/token_transfers_export.jsonl" + } + }' +``` + +Example with webhook sink: + +```bash +curl -X POST http://127.0.0.1:3000/v1/export/dataset \ + -H "Authorization: Bearer " \ + -H "Content-Type: application/json" \ + -d '{ + "dataset": "hl_fills", + "format": "csv", + "sink": { + "sink_type": "webhook", + "url": "https://example.com/receive-export", + "headers": {"X-API-Key": "your-key"} + } + }' +``` + +When a sink is configured, the export job status response includes additional fields: + +- `delivered_to`: destination description (file path, URL, etc.) — set on successful delivery +- `delivery_status`: one of `"pending"`, `"delivered"`, or `"failed"` + +Webhook URLs are validated against the same rules as `callback_url` (HTTPS/HTTP only, no private/loopback addresses). Local file paths must not contain `..` path traversal. The `database` sink type is reserved but not yet implemented at runtime. + ### GET /v1/export/jobs/:job_id ```bash diff --git a/api/Cargo.toml b/api/Cargo.toml index 42bce3a..e1f06aa 100644 --- a/api/Cargo.toml +++ b/api/Cargo.toml @@ -16,6 +16,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } sqlx = { version = "0.8.6", features = ["postgres", "runtime-tokio-rustls", "uuid", "chrono", "bigdecimal"] } dotenvy = "0.15" anyhow = "1.0" +async-trait = "0.1" bigdecimal = { version = "0.4", features = ["serde"] } uuid = { version = "1.19", features = ["v4", "serde"] } chrono = { version = "0.4", features = ["serde"] } diff --git a/api/src/main.rs b/api/src/main.rs index 0a750e3..3bc4884 100644 --- a/api/src/main.rs +++ b/api/src/main.rs @@ -20,7 +20,9 @@ use spectraplex_adapters::{ }; use spectraplex_core::config::AppConfig; use spectraplex_core::connector::validate_target; -use spectraplex_core::materializer::{DatasetName, ExportFormat}; +use spectraplex_core::materializer::{ + DatasetName, DeliveryMetadata, DeliveryReceipt, ExportFormat, ExportSink, SinkConfig, SinkType, +}; use spectraplex_core::models::{Chain, ChainIngestor, IndexerCheckpoint, LedgerEntry, Transaction}; use spectraplex_core::v2::{ normalize_evm_address, normalize_solana_address, ChainFamily, DatasetCompleteness, @@ -167,6 +169,12 @@ struct ExportJobStatus { format: String, record_count: Option, message: Option, + /// Where the export data was delivered (e.g. file path, webhook URL). + #[serde(skip_serializing_if = "Option::is_none")] + delivered_to: Option, + /// Delivery status: "pending", "delivered", "failed", or null if no sink. + #[serde(skip_serializing_if = "Option::is_none")] + delivery_status: Option, } #[derive(Debug, Clone, Serialize)] @@ -376,6 +384,9 @@ struct ExportJobRequest { network: Option, time_start: Option, time_end: Option, + /// Optional sink config. When provided, export data is delivered to the + /// specified sink in addition to being stored in-memory for download. + sink: Option, } #[derive(Serialize)] @@ -487,6 +498,155 @@ fn validate_callback_url(url: &str) -> Result<(), AppError> { } } +// --------------------------------------------------------------------------- +// Sink implementations +// --------------------------------------------------------------------------- + +/// Validates a SinkConfig at job creation time. +fn validate_sink_config(config: &SinkConfig) -> Result<(), AppError> { + config + .validate() + .map_err(|e| AppError::bad_request(format!("Invalid sink config: {e}")))?; + + match config.sink_type { + SinkType::Webhook => { + if let Some(ref url) = config.url { + validate_callback_url(url)?; + } + } + SinkType::LocalFile => { + if let Some(ref path) = config.file_path { + // Reject obviously dangerous paths + if path.contains("..") { + return Err(AppError::bad_request( + "file_path must not contain '..' path traversal", + )); + } + } + } + SinkType::Database => { + // Database sink is not yet implemented at runtime. + return Err(AppError::bad_request( + "Database sink is not yet implemented. Use local_file or webhook.", + )); + } + } + Ok(()) +} + +/// Writes export data to a local file path. +struct LocalFileSink { + path: String, +} + +#[async_trait::async_trait] +impl ExportSink for LocalFileSink { + async fn deliver( + &self, + data: &[u8], + _metadata: &DeliveryMetadata, + ) -> Result { + tokio::fs::write(&self.path, data) + .await + .map_err(|e| format!("Failed to write to {}: {e}", self.path))?; + + Ok(DeliveryReceipt { + sink_type: SinkType::LocalFile, + destination: self.path.clone(), + bytes_written: data.len(), + delivered_at: chrono::Utc::now(), + }) + } +} + +/// POSTs export data to an HTTP(S) webhook URL. +struct WebhookSink { + url: String, + headers: Option>, +} + +#[async_trait::async_trait] +impl ExportSink for WebhookSink { + async fn deliver( + &self, + data: &[u8], + metadata: &DeliveryMetadata, + ) -> Result { + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(30)) + .build() + .map_err(|e| format!("Failed to build HTTP client: {e}"))?; + + let content_type = match metadata.format.as_str() { + "csv" => "text/csv; charset=utf-8", + _ => "application/x-ndjson", + }; + + let mut req = client + .post(&self.url) + .header("Content-Type", content_type) + .header("X-Export-Dataset", &metadata.dataset) + .header("X-Export-Format", &metadata.format) + .header("X-Export-Record-Count", metadata.record_count.to_string()) + .header("X-Export-Job-Id", metadata.job_id.to_string()); + + if let Some(ref headers) = self.headers { + for (key, value) in headers { + req = req.header(key, value); + } + } + + let response = req + .body(data.to_vec()) + .send() + .await + .map_err(|e| format!("Webhook POST to {} failed: {e}", self.url))?; + + if !response.status().is_success() { + return Err(format!( + "Webhook returned non-success status: {}", + response.status() + )); + } + + Ok(DeliveryReceipt { + sink_type: SinkType::Webhook, + destination: self.url.clone(), + bytes_written: data.len(), + delivered_at: chrono::Utc::now(), + }) + } +} + +/// Builds the appropriate ExportSink from a SinkConfig. +/// Database sink is not yet implemented and returns an error. +fn build_sink(config: &SinkConfig) -> Result, String> { + match config.sink_type { + SinkType::LocalFile => { + let path = config + .file_path + .as_ref() + .ok_or("LocalFile sink requires file_path")?; + Ok(Box::new(LocalFileSink { path: path.clone() })) + } + SinkType::Webhook => { + let url = config.url.as_ref().ok_or("Webhook sink requires url")?; + Ok(Box::new(WebhookSink { + url: url.clone(), + headers: config.headers.clone(), + })) + } + SinkType::Database => { + // TODO: Implement runtime Database sink delivery. + // This requires external connection management (separate pool, + // credential handling, schema negotiation) which is out of scope + // for P4-W3. The config parsing and validation are in place; + // runtime delivery will be added in a future packet. + Err("Database sink is not yet implemented at runtime".to_string()) + } + } +} + fn check_wallet_allowed(wallet: &str, allowed: &Option>) -> Result<(), AppError> { if let Some(set) = allowed { if !set.contains(&wallet.to_lowercase()) { @@ -1759,6 +1919,11 @@ async fn create_export_job( validate_date_range(req.time_start, req.time_end)?; + // Validate sink config if provided + if let Some(ref sink_config) = req.sink { + validate_sink_config(sink_config)?; + } + let permit = state .job_semaphore .clone() @@ -1775,6 +1940,8 @@ async fn create_export_job( format: format_str.to_string(), record_count: None, message: None, + delivered_to: None, + delivery_status: req.sink.as_ref().map(|_| "pending".to_string()), }; state.export_jobs.write().await.insert( @@ -1792,6 +1959,7 @@ async fn create_export_job( let network = req.network.clone(); let time_start = req.time_start; let time_end = req.time_end; + let sink_config = req.sink.clone(); tokio::spawn(async move { let _permit = permit; @@ -1815,24 +1983,94 @@ async fn create_export_job( ) .await; - let mut exports = state_clone.export_jobs.write().await; - if let Some(entry) = exports.get_mut(&job_id) { - match result { - Ok((body, record_count)) => { - entry.status.state = JobState::Completed; - entry.status.record_count = Some(record_count); - entry.status.message = Some(format!("Exported {record_count} records")); - entry.data = Some(ExportData { - content_type: content_type_for_format(format), - body, - }); + // Separate the export result handling from sink delivery to avoid + // holding the write lock during potentially slow async I/O. + match result { + Ok((body, record_count)) => { + // Only clone body when a sink needs it after storage + let sink_body = if sink_config.is_some() { + Some(body.clone()) + } else { + None + }; + + // Mark completed and store data (short lock scope) + { + let mut exports = state_clone.export_jobs.write().await; + if let Some(entry) = exports.get_mut(&job_id) { + entry.status.state = JobState::Completed; + entry.status.record_count = Some(record_count); + entry.status.message = Some(format!("Exported {record_count} records")); + // Always store in-memory for download (backward compatibility) + entry.data = Some(ExportData { + content_type: content_type_for_format(format), + body, + }); + } } - Err(e) => { + + // Deliver to sink outside the lock (may involve network I/O) + if let Some(ref sc) = sink_config { + let body = sink_body.expect("sink_body set when sink_config is Some"); + let delivery_result = match build_sink(sc) { + Ok(sink) => { + let meta = DeliveryMetadata { + job_id, + dataset: dataset.clone(), + format: format.to_string(), + record_count, + }; + match sink.deliver(&body, &meta).await { + Ok(receipt) => Ok(receipt.destination), + Err(e) => { + warn!(error = %e, "Sink delivery failed"); + Err(format!( + "Exported {record_count} records, but sink delivery failed: {e}" + )) + } + } + } + Err(e) => { + warn!(error = %e, "Failed to build sink"); + Err(format!( + "Exported {record_count} records, but sink build failed: {e}" + )) + } + }; + + // Brief re-lock to record delivery outcome + let mut exports = state_clone.export_jobs.write().await; + if let Some(entry) = exports.get_mut(&job_id) { + match delivery_result { + Ok(destination) => { + entry.status.delivered_to = Some(destination); + entry.status.delivery_status = Some("delivered".to_string()); + } + Err(msg) => { + entry.status.delivery_status = Some("failed".to_string()); + entry.status.message = Some(msg); + } + } + } + } + + // Mark finished time + let mut exports = state_clone.export_jobs.write().await; + if let Some(entry) = exports.get_mut(&job_id) { + entry.finished_at = Some(Instant::now()); + } + } + Err(e) => { + let mut exports = state_clone.export_jobs.write().await; + if let Some(entry) = exports.get_mut(&job_id) { entry.status.state = JobState::Failed; entry.status.message = Some(format!("Export failed: {e}")); + if sink_config.is_some() { + entry.status.delivery_status = Some("failed".to_string()); + } + entry.finished_at = Some(Instant::now()); } } - entry.finished_at = Some(Instant::now()); } }); @@ -4466,6 +4704,8 @@ mod tests { format: "jsonl".to_string(), record_count: Some(2), message: Some("Exported 2 records".to_string()), + delivered_to: None, + delivery_status: None, }, finished_at: Some(Instant::now()), data: Some(ExportData { @@ -4519,6 +4759,8 @@ mod tests { format: "csv".to_string(), record_count: None, message: None, + delivered_to: None, + delivery_status: None, }, finished_at: None, data: None, @@ -4549,6 +4791,8 @@ mod tests { format: "jsonl".to_string(), record_count: None, message: Some("DB connection refused".to_string()), + delivered_to: None, + delivery_status: None, }, finished_at: Some(Instant::now()), data: None, @@ -4579,6 +4823,8 @@ mod tests { format: "csv".to_string(), record_count: Some(42), message: Some("Exported 42 records".to_string()), + delivered_to: None, + delivery_status: None, }, finished_at: Some(Instant::now()), data: None, @@ -4611,6 +4857,8 @@ mod tests { format: "jsonl".to_string(), record_count: None, message: None, + delivered_to: None, + delivery_status: None, }; let json = serde_json::to_value(&status).unwrap(); assert_eq!(json["state"], "pending"); @@ -4635,6 +4883,8 @@ mod tests { format: "jsonl".to_string(), record_count: Some(0), message: None, + delivered_to: None, + delivery_status: None, }, finished_at: Some( Instant::now() - std::time::Duration::from_secs(JOB_TTL_SECS + 1), @@ -4652,6 +4902,8 @@ mod tests { format: "csv".to_string(), record_count: None, message: None, + delivered_to: None, + delivery_status: None, }, finished_at: None, data: None, @@ -4770,4 +5022,358 @@ mod tests { "text/csv; charset=utf-8" ); } + + // -- Sink tests (P4-W3) -- + + #[test] + fn test_validate_sink_config_local_file_valid() { + let config = SinkConfig { + sink_type: SinkType::LocalFile, + file_path: Some("/tmp/export.jsonl".to_string()), + url: None, + headers: None, + connection_string: None, + table: None, + }; + assert!(validate_sink_config(&config).is_ok()); + } + + #[test] + fn test_validate_sink_config_local_file_path_traversal() { + let config = SinkConfig { + sink_type: SinkType::LocalFile, + file_path: Some("/tmp/../etc/passwd".to_string()), + url: None, + headers: None, + connection_string: None, + table: None, + }; + let err = validate_sink_config(&config).unwrap_err(); + assert!(err.message.contains("path traversal")); + } + + #[test] + fn test_validate_sink_config_webhook_valid() { + let config = SinkConfig { + sink_type: SinkType::Webhook, + file_path: None, + url: Some("https://example.com/hook".to_string()), + headers: None, + connection_string: None, + table: None, + }; + assert!(validate_sink_config(&config).is_ok()); + } + + #[test] + fn test_validate_sink_config_webhook_loopback_rejected() { + let config = SinkConfig { + sink_type: SinkType::Webhook, + file_path: None, + url: Some("http://127.0.0.1:8080/hook".to_string()), + headers: None, + connection_string: None, + table: None, + }; + let err = validate_sink_config(&config).unwrap_err(); + assert!(err.message.contains("private")); + } + + #[test] + fn test_validate_sink_config_webhook_missing_url() { + let config = SinkConfig { + sink_type: SinkType::Webhook, + file_path: None, + url: None, + headers: None, + connection_string: None, + table: None, + }; + let err = validate_sink_config(&config).unwrap_err(); + assert!(err.message.contains("url")); + } + + #[test] + fn test_validate_sink_config_database_rejected() { + let config = SinkConfig { + sink_type: SinkType::Database, + file_path: None, + url: None, + headers: None, + connection_string: Some("postgresql://localhost/exports".to_string()), + table: Some("export_data".to_string()), + }; + let err = validate_sink_config(&config).unwrap_err(); + assert!(err.message.contains("not yet implemented")); + } + + #[test] + fn test_build_sink_local_file() { + let config = SinkConfig { + sink_type: SinkType::LocalFile, + file_path: Some("/tmp/test.jsonl".to_string()), + url: None, + headers: None, + connection_string: None, + table: None, + }; + assert!(build_sink(&config).is_ok()); + } + + #[test] + fn test_build_sink_webhook() { + let config = SinkConfig { + sink_type: SinkType::Webhook, + file_path: None, + url: Some("https://example.com/hook".to_string()), + headers: None, + connection_string: None, + table: None, + }; + assert!(build_sink(&config).is_ok()); + } + + #[test] + fn test_build_sink_database_not_implemented() { + let config = SinkConfig { + sink_type: SinkType::Database, + file_path: None, + url: None, + headers: None, + connection_string: Some("postgresql://localhost/db".to_string()), + table: Some("export_data".to_string()), + }; + assert!(build_sink(&config).is_err()); + } + + #[tokio::test] + async fn test_local_file_sink_write_and_readback() { + let dir = std::env::temp_dir().join(format!("spectraplex_test_{}", Uuid::new_v4())); + tokio::fs::create_dir_all(&dir).await.unwrap(); + let path = dir.join("export.jsonl"); + + let sink = LocalFileSink { + path: path.to_str().unwrap().to_string(), + }; + + let data = b"{\"foo\":1}\n{\"bar\":2}\n"; + let meta = DeliveryMetadata { + job_id: Uuid::new_v4(), + dataset: "token_transfers".to_string(), + format: "jsonl".to_string(), + record_count: 2, + }; + + let receipt = sink.deliver(data, &meta).await.unwrap(); + assert_eq!(receipt.sink_type, SinkType::LocalFile); + assert_eq!(receipt.bytes_written, data.len()); + assert!(receipt.destination.contains("export.jsonl")); + + // Verify file content + let content = tokio::fs::read_to_string(&path).await.unwrap(); + assert_eq!(content.as_bytes(), data); + + // Cleanup + let _ = tokio::fs::remove_dir_all(&dir).await; + } + + #[tokio::test] + async fn test_local_file_sink_bad_path() { + let sink = LocalFileSink { + path: "/nonexistent_dir_abc123/impossible/export.jsonl".to_string(), + }; + let data = b"test"; + let meta = DeliveryMetadata { + job_id: Uuid::new_v4(), + dataset: "test".to_string(), + format: "jsonl".to_string(), + record_count: 1, + }; + let result = sink.deliver(data, &meta).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_webhook_sink_url_validation_via_sink_config() { + // The webhook URL validation happens at validate_sink_config level. + // Here we verify that invalid URLs are caught properly. + let config = SinkConfig { + sink_type: SinkType::Webhook, + file_path: None, + url: Some("ftp://badprotocol.com/hook".to_string()), + headers: None, + connection_string: None, + table: None, + }; + let err = validate_sink_config(&config).unwrap_err(); + assert!(err.message.contains("HTTP(S)")); + } + + #[tokio::test] + async fn test_export_job_with_sink_accepted() { + let app = test_router(); + let req = axum::http::Request::builder() + .method("POST") + .uri("/v1/export/dataset") + .header("content-type", "application/json") + .header("authorization", format!("Bearer {}", TEST_API_KEY)) + .body(Body::from( + serde_json::to_string(&serde_json::json!({ + "dataset": "token_transfers", + "format": "jsonl", + "sink": { + "sink_type": "local_file", + "file_path": "/tmp/spectraplex_export_test.jsonl" + } + })) + .unwrap(), + )) + .unwrap(); + let response = app.oneshot(req).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); + let body = response.into_body().collect().await.unwrap().to_bytes(); + let job: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(job["state"], "pending"); + assert_eq!(job["delivery_status"], "pending"); + } + + #[tokio::test] + async fn test_export_job_with_invalid_sink_rejected() { + let app = test_router(); + let req = axum::http::Request::builder() + .method("POST") + .uri("/v1/export/dataset") + .header("content-type", "application/json") + .header("authorization", format!("Bearer {}", TEST_API_KEY)) + .body(Body::from( + serde_json::to_string(&serde_json::json!({ + "dataset": "token_transfers", + "sink": { + "sink_type": "webhook" + } + })) + .unwrap(), + )) + .unwrap(); + let response = app.oneshot(req).await.unwrap(); + assert_eq!(response.status(), StatusCode::BAD_REQUEST); + let body = response.into_body().collect().await.unwrap().to_bytes(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert!(json["error"].as_str().unwrap().contains("url")); + } + + #[tokio::test] + async fn test_export_job_with_database_sink_rejected() { + let app = test_router(); + let req = axum::http::Request::builder() + .method("POST") + .uri("/v1/export/dataset") + .header("content-type", "application/json") + .header("authorization", format!("Bearer {}", TEST_API_KEY)) + .body(Body::from( + serde_json::to_string(&serde_json::json!({ + "dataset": "token_transfers", + "sink": { + "sink_type": "database", + "connection_string": "postgresql://localhost/db", + "table": "export_data" + } + })) + .unwrap(), + )) + .unwrap(); + let response = app.oneshot(req).await.unwrap(); + assert_eq!(response.status(), StatusCode::BAD_REQUEST); + let body = response.into_body().collect().await.unwrap().to_bytes(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert!(json["error"] + .as_str() + .unwrap() + .contains("not yet implemented")); + } + + #[tokio::test] + async fn test_export_job_without_sink_still_works() { + // Backward compatibility: no sink field → same behavior as before + let app = test_router(); + let req = axum::http::Request::builder() + .method("POST") + .uri("/v1/export/dataset") + .header("content-type", "application/json") + .header("authorization", format!("Bearer {}", TEST_API_KEY)) + .body(Body::from( + serde_json::to_string(&serde_json::json!({ + "dataset": "token_transfers", + "format": "jsonl" + })) + .unwrap(), + )) + .unwrap(); + let response = app.oneshot(req).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); + let body = response.into_body().collect().await.unwrap().to_bytes(); + let job: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(job["state"], "pending"); + // No sink means no delivery_status field (skip_serializing_if) + assert!(job.get("delivery_status").is_none() || job["delivery_status"].is_null()); + assert!(job.get("delivered_to").is_none() || job["delivered_to"].is_null()); + } + + #[test] + fn test_export_job_status_with_delivery_fields_serialization() { + let status = ExportJobStatus { + id: Uuid::nil(), + state: JobState::Completed, + dataset: "token_transfers".to_string(), + format: "jsonl".to_string(), + record_count: Some(10), + message: Some("Exported 10 records".to_string()), + delivered_to: Some("/tmp/export.jsonl".to_string()), + delivery_status: Some("delivered".to_string()), + }; + let json = serde_json::to_value(&status).unwrap(); + assert_eq!(json["delivered_to"], "/tmp/export.jsonl"); + assert_eq!(json["delivery_status"], "delivered"); + } + + #[test] + fn test_export_job_status_skip_none_delivery_fields() { + let status = ExportJobStatus { + id: Uuid::nil(), + state: JobState::Pending, + dataset: "token_transfers".to_string(), + format: "jsonl".to_string(), + record_count: None, + message: None, + delivered_to: None, + delivery_status: None, + }; + let json = serde_json::to_value(&status).unwrap(); + // These fields should be absent when None (skip_serializing_if) + assert!(json.get("delivered_to").is_none()); + assert!(json.get("delivery_status").is_none()); + } + + #[tokio::test] + async fn test_export_job_with_webhook_sink_path_traversal_rejected() { + let app = test_router(); + let req = axum::http::Request::builder() + .method("POST") + .uri("/v1/export/dataset") + .header("content-type", "application/json") + .header("authorization", format!("Bearer {}", TEST_API_KEY)) + .body(Body::from( + serde_json::to_string(&serde_json::json!({ + "dataset": "token_transfers", + "sink": { + "sink_type": "local_file", + "file_path": "/tmp/../etc/passwd" + } + })) + .unwrap(), + )) + .unwrap(); + let response = app.oneshot(req).await.unwrap(); + assert_eq!(response.status(), StatusCode::BAD_REQUEST); + } } diff --git a/core/src/materializer.rs b/core/src/materializer.rs index 3e55537..3779336 100644 --- a/core/src/materializer.rs +++ b/core/src/materializer.rs @@ -175,6 +175,138 @@ pub struct RegenerationRequest { pub supersede_current: bool, } +// --------------------------------------------------------------------------- +// Sink Types +// --------------------------------------------------------------------------- + +/// Supported export sink types. +/// +/// Determines where completed export data is delivered. The default behavior +/// (no sink) stores data in-memory for download via the existing endpoint. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Display, EnumString)] +#[serde(rename_all = "snake_case")] +#[strum(serialize_all = "snake_case")] +pub enum SinkType { + /// Write export data to a local file path. + LocalFile, + /// POST export data to an HTTP(S) webhook URL. + Webhook, + /// Deliver export data to an external database (stub — not yet implemented at runtime). + Database, + // ObjectStorage is the planned next extension point. + // It is intentionally not included as a variant yet to avoid dead code, + // but the enum is designed to accommodate it without breaking changes. +} + +/// Configuration for an export sink. +/// +/// Each sink type has its own required fields. The API validates the config +/// at job creation time based on the `sink_type`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SinkConfig { + /// Which sink type to deliver to. + pub sink_type: SinkType, + + // -- LocalFile fields -- + /// Absolute file path for LocalFile sink output. + pub file_path: Option, + + // -- Webhook fields -- + /// HTTP(S) URL for Webhook sink delivery. + pub url: Option, + /// Optional HTTP headers to include with the webhook POST. + pub headers: Option>, + + // -- Database fields -- + /// Connection string for Database sink (e.g. `postgresql://host/db`). + pub connection_string: Option, + /// Target table name for Database sink. + pub table: Option, +} + +impl SinkConfig { + /// Validate that the config has the required fields for its sink type. + pub fn validate(&self) -> Result<(), String> { + match self.sink_type { + SinkType::LocalFile => { + let path = self + .file_path + .as_deref() + .ok_or("LocalFile sink requires 'file_path'")?; + if path.is_empty() { + return Err("file_path must not be empty".to_string()); + } + Ok(()) + } + SinkType::Webhook => { + let url = self.url.as_deref().ok_or("Webhook sink requires 'url'")?; + if url.is_empty() { + return Err("url must not be empty".to_string()); + } + Ok(()) + } + SinkType::Database => { + let cs = self + .connection_string + .as_deref() + .ok_or("Database sink requires 'connection_string'")?; + if cs.is_empty() { + return Err("connection_string must not be empty".to_string()); + } + let tbl = self + .table + .as_deref() + .ok_or("Database sink requires 'table'")?; + if tbl.is_empty() { + return Err("table must not be empty".to_string()); + } + Ok(()) + } + } + } +} + +/// Metadata about an export delivery for receipt tracking. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DeliveryMetadata { + /// Job ID that produced the export data. + pub job_id: Uuid, + /// Dataset that was exported. + pub dataset: String, + /// Export format (e.g. "jsonl", "csv"). + pub format: String, + /// Number of records in the export. + pub record_count: usize, +} + +/// Receipt returned after successful sink delivery. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DeliveryReceipt { + /// Sink type that delivered the data. + pub sink_type: SinkType, + /// Human-readable description of where data was delivered. + pub destination: String, + /// Number of bytes delivered. + pub bytes_written: usize, + /// When delivery completed. + pub delivered_at: DateTime, +} + +/// Async trait for export sink implementations. +/// +/// Each sink type implements this trait to deliver serialized export data +/// to its destination. Sinks receive the raw bytes (already serialized as +/// JSONL or CSV) plus metadata about the export job. +#[async_trait::async_trait] +pub trait ExportSink: Send + Sync { + /// Deliver export data to the sink destination. + async fn deliver( + &self, + data: &[u8], + metadata: &DeliveryMetadata, + ) -> Result; +} + // --------------------------------------------------------------------------- // Export Format // --------------------------------------------------------------------------- @@ -987,4 +1119,235 @@ mod tests { "HlPositionChange must not have user_id" ); } + + // -- SinkType tests (P4-W3) -- + + #[test] + fn sink_type_serde_roundtrip() { + for (variant, expected) in [ + (SinkType::LocalFile, "\"local_file\""), + (SinkType::Webhook, "\"webhook\""), + (SinkType::Database, "\"database\""), + ] { + let json = serde_json::to_string(&variant).unwrap(); + assert_eq!(json, expected, "serialize {variant:?}"); + let back: SinkType = serde_json::from_str(&json).unwrap(); + assert_eq!(back, variant, "deserialize {expected}"); + } + } + + #[test] + fn sink_type_from_str() { + assert_eq!( + SinkType::from_str("local_file").unwrap(), + SinkType::LocalFile + ); + assert_eq!(SinkType::from_str("webhook").unwrap(), SinkType::Webhook); + assert_eq!(SinkType::from_str("database").unwrap(), SinkType::Database); + assert!(SinkType::from_str("object_storage").is_err()); + assert!(SinkType::from_str("s3").is_err()); + } + + #[test] + fn sink_type_display() { + assert_eq!(SinkType::LocalFile.to_string(), "local_file"); + assert_eq!(SinkType::Webhook.to_string(), "webhook"); + assert_eq!(SinkType::Database.to_string(), "database"); + } + + // -- SinkConfig tests (P4-W3) -- + + #[test] + fn sink_config_local_file_valid() { + let config = SinkConfig { + sink_type: SinkType::LocalFile, + file_path: Some("/tmp/export.jsonl".to_string()), + url: None, + headers: None, + connection_string: None, + table: None, + }; + assert!(config.validate().is_ok()); + } + + #[test] + fn sink_config_local_file_missing_path() { + let config = SinkConfig { + sink_type: SinkType::LocalFile, + file_path: None, + url: None, + headers: None, + connection_string: None, + table: None, + }; + let err = config.validate().unwrap_err(); + assert!(err.contains("file_path")); + } + + #[test] + fn sink_config_local_file_empty_path() { + let config = SinkConfig { + sink_type: SinkType::LocalFile, + file_path: Some(String::new()), + url: None, + headers: None, + connection_string: None, + table: None, + }; + let err = config.validate().unwrap_err(); + assert!(err.contains("file_path")); + } + + #[test] + fn sink_config_webhook_valid() { + let config = SinkConfig { + sink_type: SinkType::Webhook, + file_path: None, + url: Some("https://example.com/hook".to_string()), + headers: None, + connection_string: None, + table: None, + }; + assert!(config.validate().is_ok()); + } + + #[test] + fn sink_config_webhook_missing_url() { + let config = SinkConfig { + sink_type: SinkType::Webhook, + file_path: None, + url: None, + headers: None, + connection_string: None, + table: None, + }; + let err = config.validate().unwrap_err(); + assert!(err.contains("url")); + } + + #[test] + fn sink_config_webhook_with_headers() { + let mut headers = std::collections::HashMap::new(); + headers.insert("X-API-Key".to_string(), "secret".to_string()); + let config = SinkConfig { + sink_type: SinkType::Webhook, + file_path: None, + url: Some("https://example.com/hook".to_string()), + headers: Some(headers), + connection_string: None, + table: None, + }; + assert!(config.validate().is_ok()); + } + + #[test] + fn sink_config_database_valid() { + let config = SinkConfig { + sink_type: SinkType::Database, + file_path: None, + url: None, + headers: None, + connection_string: Some("postgresql://localhost/exports".to_string()), + table: Some("export_data".to_string()), + }; + assert!(config.validate().is_ok()); + } + + #[test] + fn sink_config_database_missing_connection_string() { + let config = SinkConfig { + sink_type: SinkType::Database, + file_path: None, + url: None, + headers: None, + connection_string: None, + table: Some("export_data".to_string()), + }; + let err = config.validate().unwrap_err(); + assert!(err.contains("connection_string")); + } + + #[test] + fn sink_config_database_missing_table() { + let config = SinkConfig { + sink_type: SinkType::Database, + file_path: None, + url: None, + headers: None, + connection_string: Some("postgresql://localhost/exports".to_string()), + table: None, + }; + let err = config.validate().unwrap_err(); + assert!(err.contains("table")); + } + + #[test] + fn sink_config_serde_roundtrip() { + let config = SinkConfig { + sink_type: SinkType::Webhook, + file_path: None, + url: Some("https://example.com/hook".to_string()), + headers: Some({ + let mut h = std::collections::HashMap::new(); + h.insert("Authorization".to_string(), "Bearer tok".to_string()); + h + }), + connection_string: None, + table: None, + }; + let json = serde_json::to_string(&config).unwrap(); + let back: SinkConfig = serde_json::from_str(&json).unwrap(); + assert_eq!(back.sink_type, SinkType::Webhook); + assert_eq!(back.url, Some("https://example.com/hook".to_string())); + assert!(back.headers.is_some()); + } + + #[test] + fn sink_config_local_file_serde_roundtrip() { + let config = SinkConfig { + sink_type: SinkType::LocalFile, + file_path: Some("/tmp/export.csv".to_string()), + url: None, + headers: None, + connection_string: None, + table: None, + }; + let json = serde_json::to_string(&config).unwrap(); + let back: SinkConfig = serde_json::from_str(&json).unwrap(); + assert_eq!(back.sink_type, SinkType::LocalFile); + assert_eq!(back.file_path, Some("/tmp/export.csv".to_string())); + } + + // -- DeliveryMetadata tests -- + + #[test] + fn delivery_metadata_serde_roundtrip() { + let meta = DeliveryMetadata { + job_id: uuid::Uuid::new_v4(), + dataset: "token_transfers".to_string(), + format: "jsonl".to_string(), + record_count: 42, + }; + let json = serde_json::to_string(&meta).unwrap(); + let back: DeliveryMetadata = serde_json::from_str(&json).unwrap(); + assert_eq!(back.dataset, "token_transfers"); + assert_eq!(back.record_count, 42); + } + + // -- DeliveryReceipt tests -- + + #[test] + fn delivery_receipt_serde_roundtrip() { + let receipt = DeliveryReceipt { + sink_type: SinkType::LocalFile, + destination: "/tmp/export.jsonl".to_string(), + bytes_written: 1024, + delivered_at: chrono::Utc::now(), + }; + let json = serde_json::to_string(&receipt).unwrap(); + let back: DeliveryReceipt = serde_json::from_str(&json).unwrap(); + assert_eq!(back.sink_type, SinkType::LocalFile); + assert_eq!(back.destination, "/tmp/export.jsonl"); + assert_eq!(back.bytes_written, 1024); + } }