Skip to content

P4-W3: Sink abstraction for export job delivery#124

Merged
user1303836 merged 2 commits intomainfrom
p4-w3-sink-abstraction
Mar 9, 2026
Merged

P4-W3: Sink abstraction for export job delivery#124
user1303836 merged 2 commits intomainfrom
p4-w3-sink-abstraction

Conversation

@user1303836
Copy link
Owner

Summary

  • Adds a sink abstraction so export jobs can deliver dataset exports to local files, webhooks, or external databases, with object storage designed as the next extension point.
  • Core types: SinkType enum (LocalFile, Webhook, Database, with ObjectStorage reserved), SinkConfig struct with per-type validation, async ExportSink trait, DeliveryReceipt, and DeliveryMetadata.
  • LocalFileSink writes exports to disk via tokio::fs; WebhookSink POSTs with configurable headers, content-type, and metadata headers.
  • DatabaseSink is config-parseable but cleanly rejected at runtime with a clear message.
  • ExportJobRequest accepts optional sink config; ExportJobStatus gains delivered_to and delivery_status fields (omitted when no sink is used, preserving backward compatibility).
  • Sink delivery runs after serialization completes in the spawned task, alongside in-memory storage for backward-compatible download.
  • README.md updated with sink configuration documentation.

Validation

  • All 772 tests pass (cargo fmt --all --check, cargo clippy --workspace --all-targets -- -D warnings, cargo test --workspace — all green).
  • 35 new tests covering sink type serde, config validation, LocalFileSink write+readback, WebhookSink URL validation, API integration with sink config, path traversal rejection, database sink rejection, and backward compatibility without sink.
  • No wallet-assumption, migration, or compatibility regressions.

Test plan

  • cargo fmt --all --check passes
  • cargo clippy --workspace --all-targets -- -D warnings passes
  • cargo test --workspace passes (772 tests)
  • Reviewer confirms sink types and trait design
  • Reviewer confirms backward compatibility of ExportJobRequest/ExportJobStatus
  • Reviewer confirms README documentation accuracy

🤖 Generated with Claude Code

Add SinkType enum (LocalFile, Webhook, Database), SinkConfig struct, async
ExportSink trait, and DeliveryReceipt/DeliveryMetadata types to core. Implement
LocalFileSink (writes to disk) and WebhookSink (POSTs to validated URL) in the
API server. DatabaseSink is defined as a config-parseable stub with runtime
delivery deferred. Export jobs accept an optional sink config; data is always
stored in-memory for backward-compatible download, and additionally delivered to
the configured sink. ExportJobStatus gains delivered_to and delivery_status
fields. Comprehensive tests cover both sink implementations and API integration.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Entire-Checkpoint: d59c5cadaef6
@user1303836
Copy link
Owner Author

Remote Review — P4-W3: Sink abstraction

CI status: Formatting ✅ | Clippy ✅ | Tests ✅ | Build ✅ | Cargo Audit ⏳ (independent)

Review against focus areas

1. SinkConfig validation (path traversal / SSRF)

  • Path traversal: contains("..") check rejects .. segments. Adequate for the current scope.
  • Webhook SSRF: Correctly delegates to the existing validate_callback_url which rejects private/loopback IPs and non-HTTP(S) schemes. Tests confirm both loopback and bad-protocol rejection.
  • Database sink is rejected at validation time before any runtime attempt.

2. Backward compatibility

  • sink field on ExportJobRequest is Option — omitting it preserves the existing flow.
  • delivered_to and delivery_status use skip_serializing_if = "Option::is_none" — absent from JSON when no sink is configured.
  • Data is always stored in-memory for the existing download endpoint regardless of sink config.
  • Tests explicitly verify no-sink behavior is unchanged.

3. Memory management ✅ (with note)

  • Data is always held in-memory for download even after successful sink delivery. This is an intentional backward-compatibility choice. Acceptable given the existing 100K-record cap and TTL-based cleanup. See non-blocking note below.

4. Error handling

  • Sink delivery failure sets delivery_status = "failed" and appends the error to message, but the job state remains Completed since the export itself succeeded. The export data stays downloadable.
  • build_sink errors are handled the same way.
  • Export serialization failures correctly mark the job as Failed and set delivery_status = "failed" when a sink was configured.
  • Job lifecycle is clean and never left in an inconsistent state.

5. Trait design

  • ExportSink trait in core/ is cleanly generic: deliver(&[u8], &DeliveryMetadata) -> Result<DeliveryReceipt, String>.
  • SinkType enum is designed for non-breaking extension (ObjectStorage comment is clear).
  • DeliveryReceipt and DeliveryMetadata are properly separated and serde-roundtrippable.

6. No wallet-only assumptions

  • Sink types, config, trait, and delivery flow are fully generic over datasets. No wallet references in any sink code.

Verdict

No blocking issues. The remote review pass is clear — this packet is ready to merge from the PR-comment review perspective.

Non-blocking notes for future iterations

  1. Path validation hardening — Consider additionally restricting LocalFile paths to an allow-list of directories or using Path::canonicalize to resolve symlinks post-creation. The .. check is adequate for now.
  2. Memory optimization for sink-delivered jobs — When a sink is configured and delivery succeeds, the in-memory copy could be dropped earlier (or opt-in skipped) to reduce memory pressure. Not urgent given the 100K-record cap.
  3. Error typeExportSink::deliver returns Result<_, String>. A typed error enum would be more idiomatic for future sinks (e.g., retryable vs permanent failures), but String is fine for this scope.
  4. Streaming delivery — The &[u8] interface requires full in-memory payload. For a future S3 sink, an AsyncRead-based interface would be more memory-efficient. Pre-existing constraint from the export flow, not introduced here.

Sink delivery (especially webhook POST with up to 30s timeout) was performed
while holding the export_jobs RwLock write guard, blocking all other export
operations. Restructure to: acquire lock briefly to mark completed and store
data, release lock, perform sink delivery, then briefly re-lock to record
delivery outcome. Only clones the export body when a sink is configured.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Entire-Checkpoint: d0e774e443dc
@user1303836
Copy link
Owner Author

Review feedback fix applied

Blocking issue from local review: Sink delivery was holding the export_jobs write lock during potentially slow async I/O (up to 30s for webhook timeouts), which would block all other export job operations.

Fix (commit 4dbcd9d): Restructured the export task to use short lock scopes:

  1. Brief lock to mark job as Completed and store data in-memory
  2. Lock released — sink delivery runs without holding any lock
  3. Brief re-lock to record delivery outcome (delivered/failed)
  4. Brief re-lock to set finished_at

Only clones the export body when a sink is actually configured (no performance cost for the no-sink path).

All 772 tests pass. CI green (fmt ✅, clippy ✅, tests ✅, build ✅, audit ✅). Ready to merge.

@user1303836 user1303836 merged commit a2734f5 into main Mar 9, 2026
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant