diff --git a/.cursor/rules/architecture.mdc b/.cursor/rules/architecture.mdc new file mode 100644 index 0000000..356229c --- /dev/null +++ b/.cursor/rules/architecture.mdc @@ -0,0 +1,73 @@ +--- +description: +globs: *.rs +alwaysApply: false +--- +# Architecture Rules + +## Core Principles + +1. Each molecule must be a Working Piece of Software: + - Contains everything needed to run independently + - Represents a single domain concept + - Can be executed standalone or as part of a larger system + - Hides internal implementation details + +2. Molecule Structure: + - Must have a `main.rs` for standalone execution + - Contains all related domain logic in one place + - Uses Rust modules for internal organization + - No artificial layering within molecules + +3. Shared Code: + - Domain-agnostic utilities in `lib/` + - No domain logic in shared code + - Clear ownership of domain-specific code + - Unidirectional dependencies only + +4. Module Integration: + - Molecules can be combined into larger features + - No circular dependencies allowed + - Clear dependency graph + - Explicit integration points + +5. Feature Removal: + - Molecules can be removed without side effects + - No scattered references across codebase + - Clean removal process + - No feature flags needed + +## Directory Structure + +``` +modules/ +├── molecule/ +│ ├── src/ +│ │ ├── lib.rs // Public interface +│ │ ├── molecule.rs // Core domain logic +│ │ └── schema.rs // Data definitions +│ ├── Cargo.toml // Dependencies +│ └── main.rs // Standalone runner +│ +├── aggregator/ +│ ├── src/ +│ │ └── main.rs // Combines molecules +│ └── Cargo.toml +│ +└── lib/ + ├── src/ + │ ├── lib.rs + │ └── utils.rs + └── Cargo.toml +``` + +## Naming Conventions + +1. Molecules must be named after their domain concept +2. Files must reflect their primary responsibility +3. No generic names like "controller", "model", "view" +4. Use clear, domain-specific terminology +5. Follow Rust naming conventions: + - `snake_case` for modules and functions + - `CamelCase` for types and traits + - `SCREAMING_SNAKE_CASE` for constants diff --git a/.cursor/rules/code-style.mdc b/.cursor/rules/code-style.mdc new file mode 100644 index 0000000..5bc2600 --- /dev/null +++ b/.cursor/rules/code-style.mdc @@ -0,0 +1,104 @@ +--- +description: +globs: *.rs +alwaysApply: false +--- +# Code Style Rules + +## Organization + +1. Alphabetical ordering: + - Sort imports alphabetically + - Sort exports alphabetically + - Sort struct fields alphabetically + - Sort enum variants alphabetically + - Sort function parameters alphabetically + +2. File structure: + - Module declarations at the top + - Imports after module declarations + - Types and traits next + - Constants after types + - Functions after constants + - Tests at the bottom + +3. Function organization: + - Public functions first + - Private functions after + - Helper functions last + +## Naming + +1. Use domain-specific names: + - Avoid generic terms + - Names should reflect business concepts + - No technical implementation names + +2. Function names: + - Start with verbs + - Be specific about action + - Include domain context + - Use `snake_case` + +3. Variable names: + - Be descriptive + - Include type information + - Use domain terminology + - Use `snake_case` + +## Abstractions + +1. Only create abstractions for: + - Domain concepts + - DRY violations + - No other reasons + +2. Keep abstractions: + - Close to their usage + - Simple and focused + - Well-documented + +3. Avoid: + - Premature abstraction + - Over-engineering + - Unnecessary traits + +## Molecule Structure + +1. Keep related code together: + - Domain logic in one place + - No artificial layering + - Clear boundaries + - Use Rust modules for organization + +2. Minimize dependencies: + - Only import what's needed + - No circular dependencies + - Clear dependency direction + - Use feature flags in Cargo.toml + +3. Export only what's needed: + - Hide implementation details + - Clear public interface + - Domain-focused exports + - Use `pub(crate)` for internal visibility + +## Rust-Specific Guidelines + +1. Error handling: + - Use custom error types + - Implement `std::error::Error` + - Use `?` operator appropriately + - Provide context in error messages + +2. Type safety: + - Use strong types + - Avoid `String` when possible + - Use newtypes for domain concepts + - Leverage the type system + +3. Performance: + - Use references appropriately + - Avoid unnecessary cloning + - Use appropriate data structures + - Profile when needed diff --git a/.cursor/rules/documentation.mdc b/.cursor/rules/documentation.mdc new file mode 100644 index 0000000..22e006a --- /dev/null +++ b/.cursor/rules/documentation.mdc @@ -0,0 +1,98 @@ +--- +description: +globs: *.rs +alwaysApply: false +--- +# Documentation Rules + +## Code Documentation + +1. Function documentation: + - Purpose and intent + - Parameters and return values + - Usage examples + - No implementation details + - Use `///` for doc comments + +2. Type documentation: + - Domain meaning + - Usage context + - Constraints + - Examples + - Use `///` for doc comments + +3. Module documentation: + - Purpose and scope + - Dependencies + - Usage guidelines + - Examples + - Use `//!` for module-level docs + +## Molecule Documentation + +1. Core documentation: + - Domain concept + - Standalone capabilities + - Integration points + - Dependencies + - Cargo.toml metadata + +2. Molecule structure: + - File organization + - Key components + - Data flow + - External interfaces + - Module hierarchy + +3. Molecule usage: + - How to run standalone + - How to integrate + - Configuration options + - Examples + - Feature flags + +## System Documentation + +1. Architecture overview: + - Molecule relationships + - Dependency graph + - Integration patterns + - Deployment options + - Workspace structure + +2. Shared resources: + - Library usage + - Common utilities + - Cross-cutting concerns + - Best practices + - Cargo workspace + +3. Development workflow: + - Adding new molecules + - Removing molecules + - Testing strategy + - Deployment process + - Cargo commands + +## Documentation Standards + +1. Documentation location: + - Code comments for implementation + - README.md for modules + - Architecture docs for system + - API docs for interfaces + - Cargo doc for API docs + +2. Documentation format: + - Clear and concise + - Use markdown + - Include examples + - Keep up to date + - Follow rustdoc conventions + +3. Documentation maintenance: + - Update with code changes + - Review regularly + - Remove obsolete docs + - Version control docs + - Run `cargo doc` regularly diff --git a/.cursor/rules/testing.mdc b/.cursor/rules/testing.mdc new file mode 100644 index 0000000..3eb4189 --- /dev/null +++ b/.cursor/rules/testing.mdc @@ -0,0 +1,106 @@ +--- +description: +globs: *.rs +alwaysApply: false +--- +# Testing Rules + +## Core Principles + +1. Each molecule must be independently testable: + - No dependencies on other molecules + - Use in-memory databases for testing + - Mock external services + - Use feature flags for test dependencies + +2. Test organization: + - Unit tests in `#[cfg(test)]` modules + - Integration tests in `tests/` directory + - End-to-end tests for aggregators + - Use test attributes appropriately + +3. Test naming: + - Describe the scenario + - Include expected outcome + - Use domain terminology + - Follow Rust test naming conventions + +## Test Structure + +1. Test file organization: + - Group related tests + - Alphabetical ordering within groups + - Clear test descriptions + - Use test modules for organization + +2. Test setup: + - Minimal setup required + - Clear test data + - Isolated test environment + - Use test fixtures when needed + +3. Test assertions: + - One concept per test + - Clear failure messages + - Domain-specific assertions + - Use appropriate assertion macros + +## Testing Practices + +1. Test coverage: + - Core business logic + - Edge cases + - Error conditions + - Use `cargo tarpaulin` for coverage + +2. Test data: + - Use realistic data + - Include edge cases + - Document test data purpose + - Use const for test data + +3. Test maintenance: + - Keep tests simple + - Update tests with code changes + - Remove obsolete tests + - Run tests in CI + +## Molecule Testing + +1. Standalone testing: + - Test molecule in isolation + - Use in-memory services + - No external dependencies + - Use test features in Cargo.toml + +2. Integration testing: + - Test molecule boundaries + - Verify integration points + - Check dependency handling + - Use test helpers + +3. Aggregator testing: + - Test molecule combinations + - Verify system behavior + - Check communication flow + - Use workspace tests + +## Rust-Specific Testing + +1. Test attributes: + - Use `#[test]` for test functions + - Use `#[cfg(test)]` for test modules + - Use `#[ignore]` for slow tests + - Use `#[should_panic]` for panic tests + +2. Test utilities: + - Use `assert!` for boolean checks + - Use `assert_eq!` for equality + - Use `assert_ne!` for inequality + - Use `assert_matches!` for pattern matching + +3. Test organization: + - Use `mod tests` for unit tests + - Use `tests/` directory for integration + - Use `benches/` for benchmarks + - Use `examples/` for examples diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md deleted file mode 100644 index bab372d..0000000 --- a/ARCHITECTURE.md +++ /dev/null @@ -1,241 +0,0 @@ -# Molecular Architecture for Workflow Automation Tool - -## Core Molecules (Domain Concepts) - -### 1. `workflow-engine/` - The Heart of Execution -**Domain:** Workflow execution and orchestration -``` -workflow-engine/ -├── workflow-engine.ts // Core execution logic -├── wasm-runner.ts // Wasmtime integration -├── execution-context.ts // Runtime state management -├── schema.sql // Workflow definitions, runs, states -└── main.ts // Standalone runner: node run workflow-engine input.json -``` - -**Responsibilities:** -- Load and validate workflow definitions -- Execute WASM components via wasmtime -- Manage execution state and context -- Handle data flow between nodes -- **Runnable alone:** Can execute workflows headlessly for CI/CD, testing, or server-side automation - -### 2. `component-builder/` - WASM Component Management -**Domain:** Building and managing workflow components -``` -component-builder/ -├── component-builder.ts // Cargo component integration -├── wasm-registry.ts // Component storage and versioning -├── component-validator.ts // Validate WASM interfaces -├── schema.sql // Component metadata, versions -└── main.ts // Standalone: build and test components -``` - -**Responsibilities:** -- Interface with `cargo component` to build WASM modules -- Store and version workflow components -- Validate component interfaces and compatibility -- **Runnable alone:** CLI tool for component development and testing - -### 3. `workflow-designer/` - Visual Flow Editor -**Domain:** Workflow creation and editing interface -``` -workflow-designer/ -├── workflow-designer.ts // Core designer logic -├── svelteflow-adapter.ts // SvelteFlow integration -├── node-palette.ts // Available components UI -├── canvas-state.ts // Designer state management -└── main.ts // Standalone designer app -``` - -**Responsibilities:** -- Visual workflow creation using SvelteFlow -- Node palette management -- Canvas state and validation -- Export workflow definitions -- **Runnable alone:** Pure designer interface for workflow creation - -### 4. `execution-monitor/` - Runtime Observability -**Domain:** Workflow execution monitoring and debugging -``` -execution-monitor/ -├── execution-monitor.ts // Real-time execution tracking -├── trace-collector.ts // Execution trace management -├── output-manager.ts // Handle workflow outputs -├── debug-interface.ts // Debugging tools -├── schema.sql // Traces, outputs, metrics -└── main.ts // Standalone monitoring dashboard -``` - -**Responsibilities:** -- Collect and display execution traces -- Manage workflow outputs and artifacts -- Provide debugging interface -- Real-time execution monitoring -- **Runnable alone:** Monitoring dashboard for production workflows - -### 5. `node-configurator/` - Node Settings Management -**Domain:** Individual node configuration and validation -``` -node-configurator/ -├── node-configurator.ts // Node configuration logic -├── settings-validator.ts // Validate node settings -├── schema-generator.ts // Generate settings schemas from WASM -├── ui-generator.ts // Dynamic settings UI -└── main.ts // Standalone node config tool -``` - -**Responsibilities:** -- Dynamic settings UI generation based on component schemas -- Settings validation and serialization -- Schema introspection from WASM components -- **Runnable alone:** Component configuration and testing tool - -### 6. `ai-assistant/` - Intelligent Workflow Help -**Domain:** AI-powered workflow assistance -``` -ai-assistant/ -├── ai-assistant.ts // Core AI integration -├── workflow-analyzer.ts // Analyze and suggest improvements -├── component-suggester.ts // Suggest relevant components -├── prompt-manager.ts // Manage AI prompts and context -└── main.ts // Standalone AI chat interface -``` - -**Responsibilities:** -- Analyze workflows and suggest improvements -- Help users find relevant components -- Generate workflow templates -- Provide contextual help -- **Runnable alone:** AI assistant for workflow development - -### 7. `collaboration-hub/` - Real-time Collaboration -**Domain:** Multi-user workflow collaboration -``` -collaboration-hub/ -├── collaboration-hub.ts // Core collaboration logic -├── websocket-server.ts // Real-time communication -├── conflict-resolver.ts // Handle concurrent edits -├── presence-manager.ts // User presence tracking -├── schema.sql // Sessions, presence, changes -└── main.ts // Standalone collaboration server -``` - -**Responsibilities:** -- WebSocket-based real-time collaboration -- Conflict resolution for concurrent edits -- User presence and cursor tracking -- Change synchronization -- **Runnable alone:** Collaboration server for team workflows - -## Shared Libraries (`lib/`) - -``` -lib/ -├── wasm-interface.ts // Common WASM component interface -├── workflow-schema.ts // Workflow definition schemas -├── websocket-client.ts // Shared WebSocket utilities -├── database-service.ts // Configurable DB service (in-memory/remote) -├── event-bus.ts // Cross-module event system -└── auth-service.ts // Authentication utilities -``` - -## Aggregator Modules - -### `workflow-studio/` - Full Development Environment -``` -workflow-studio/ -└── main.ts // Combines designer + configurator + ai-assistant + collaboration -``` - -### `workflow-server/` - Production Runtime -``` -workflow-server/ -└── main.ts // Combines engine + monitor + collaboration for production -``` - -### `workflow-cli/` - Command Line Interface -``` -workflow-cli/ -└── main.ts // Combines engine + builder for headless operations -``` - -## Dependency Graph - -``` -workflow-studio → workflow-designer - → node-configurator - → ai-assistant - → collaboration-hub - -workflow-server → workflow-engine - → execution-monitor - → collaboration-hub - -workflow-cli → workflow-engine - → component-builder - -All modules → lib/* (shared utilities) -``` - -## Key Benefits of This Structure - -### 1. **Independent Development & Testing** -- Test component building: `node run component-builder some-component.rs` -- Test workflow execution: `node run workflow-engine my-workflow.json` -- Test designer UI: `node run workflow-designer` (opens browser) -- Test monitoring: `node run execution-monitor` (dashboard) - -### 2. **Flexible Deployment** -- **Development:** Run `workflow-studio` with all features -- **Production:** Run `workflow-server` for execution + monitoring -- **CI/CD:** Use `workflow-cli` for automated testing -- **Microservice Migration:** Each molecule can become a separate service - -### 3. **Clean Feature Removal** -Don't need AI assistance? Delete `ai-assistant/` folder and remove from aggregators. -Don't need collaboration? Delete `collaboration-hub/` folder. - -### 4. **Domain-Focused Testing** -```javascript -// Test workflow execution without UI complexity -test('workflow executes correctly', async () => { - const db = DatabaseService('in-memory'); - const engine = new WorkflowEngine(db); - - const result = await engine.execute(simpleWorkflow); - expect(result.status).toBe('completed'); -}); -``` - -### 5. **Technology Flexibility** -- Swap SvelteFlow for React Flow? Only affects `workflow-designer/` -- Change from wasmtime to another WASM runtime? Only affects `workflow-engine/` -- Different AI provider? Only affects `ai-assistant/` - -## Integration Points - -### Event-Driven Communication -```javascript -// workflow-designer publishes workflow changes -EventBus.publish('workflow.changed', workflowDefinition); - -// collaboration-hub listens and broadcasts to other users -EventBus.subscribe('workflow.changed', (workflow) => { - WebSocketServer.broadcast('workflow.update', workflow); -}); -``` - -### Shared Schemas -```javascript -// lib/workflow-schema.ts defines the contract -interface WorkflowDefinition { - nodes: WorkflowNode[]; - connections: Connection[]; - metadata: WorkflowMetadata; -} - -// All modules use the same schema -``` - -This architecture gives you the "microservices feel" with monolith simplicity. Each domain concept is fully self-contained, testable, and runnable independently, while still being able to compose into a complete workflow automation platform. diff --git a/Cargo.lock b/Cargo.lock index ce01dfb..48549e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,15 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" +[[package]] +name = "aho-corasick" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" +dependencies = [ + "memchr", +] + [[package]] name = "allocator-api2" version = "0.2.21" @@ -29,6 +38,12 @@ version = "0.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e9d4ee0d472d1cd2e28c97dfa124b3d8d992e10eb0a035f33f5d12e3a177ba3b" +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + [[package]] name = "android_system_properties" version = "0.1.5" @@ -100,28 +115,6 @@ version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dde20b3d026af13f561bdd0f15edf01fc734f0dafcedbaf42bba506a9517f223" -[[package]] -name = "async-stream" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" -dependencies = [ - "async-stream-impl", - "futures-core", - "pin-project-lite", -] - -[[package]] -name = "async-stream-impl" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "async-trait" version = "0.1.88" @@ -160,6 +153,12 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "base64" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" + [[package]] name = "base64" version = "0.22.1" @@ -303,6 +302,21 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", + "num-traits", + "serde", + "wasm-bindgen", + "windows-link", +] + [[package]] name = "clap" version = "4.5.38" @@ -355,6 +369,26 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" +[[package]] +name = "const_format" +version = "0.2.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "126f97965c8ad46d6d9163268ff28432e8f6a1196a55578867832e3049df63dd" +dependencies = [ + "const_format_proc_macros", +] + +[[package]] +name = "const_format_proc_macros" +version = "0.2.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d57c2eccfb16dbac1f4e61e206105db5820c9d26c3c472bc17c774259ef7744" +dependencies = [ + "proc-macro2", + "quote", + "unicode-xid", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -569,6 +603,41 @@ dependencies = [ "typenum", ] +[[package]] +name = "darling" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d00b9596d185e565c2207a0b01f8bd1a135483d02d9b7b0a54b11da8d53412e" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn", +] + +[[package]] +name = "darling_macro" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" +dependencies = [ + "darling_core", + "quote", + "syn", +] + [[package]] name = "debugid" version = "0.8.0" @@ -578,6 +647,37 @@ dependencies = [ "uuid", ] +[[package]] +name = "derive_builder" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "507dfb09ea8b7fa618fcf76e953f4f5e192547945816d5358edffe39f6f94947" +dependencies = [ + "derive_builder_macro", +] + +[[package]] +name = "derive_builder_core" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d5bcf7b024d6835cfb3d473887cd966994907effbe9227e8c8219824d06c4e8" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "derive_builder_macro" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab63b0e2bf4d5928aff72e83a7dace85d7bba5fe12dcc3c5a572d78caffd3f3c" +dependencies = [ + "derive_builder_core", + "syn", +] + [[package]] name = "digest" version = "0.10.7" @@ -586,6 +686,7 @@ checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", "crypto-common", + "subtle", ] [[package]] @@ -667,29 +768,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "engine" -version = "0.1.0" -dependencies = [ - "async-stream", - "clap", - "petgraph", - "reqwest", - "serde", - "serde_json", - "serde_yaml", - "thiserror 2.0.12", - "tokio", - "tokio-stream", - "tracing", - "tracing-subscriber", - "url", - "wasm-wave", - "wasmtime", - "wasmtime-wasi", - "wasmtime-wasi-http", -] - [[package]] name = "equivalent" version = "1.0.2" @@ -729,6 +807,20 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "file-source" +version = "0.1.0" +dependencies = [ + "clap", + "oci-client", + "regex", + "reqwest", + "serde", + "thiserror 2.0.12", + "tokio", + "url", +] + [[package]] name = "fixedbitset" version = "0.5.7" @@ -818,6 +910,17 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.31" @@ -837,10 +940,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-core", + "futures-io", + "futures-macro", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -898,6 +1005,18 @@ dependencies = [ "wasi 0.14.2+wasi-0.2.4", ] +[[package]] +name = "getset" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3586f256131df87204eb733da72e3d3eb4f343c639f4b7be279ac7c48baeafe" +dependencies = [ + "proc-macro-error2", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "gimli" version = "0.31.1" @@ -946,6 +1065,15 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest", +] + [[package]] name = "http" version = "1.3.1" @@ -957,6 +1085,15 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-auth" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "150fa4a9462ef926824cf4519c84ed652ca8f4fbae34cb8af045b5cbcaf98822" +dependencies = [ + "memchr", +] + [[package]] name = "http-body" version = "1.0.1" @@ -1182,6 +1319,12 @@ version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25a2bc672d1148e28034f176e01fffebb08b35768468cc954630da77a1449005" +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "1.0.3" @@ -1297,6 +1440,21 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jwt" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6204285f77fe7d9784db3fdc449ecce1a0114927a51d5a41c4c7a292011c015f" +dependencies = [ + "base64 0.13.1", + "crypto-common", + "digest", + "hmac", + "serde", + "serde_json", + "sha2", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -1487,6 +1645,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + [[package]] name = "object" version = "0.36.7" @@ -1499,6 +1666,60 @@ dependencies = [ "memchr", ] +[[package]] +name = "oci-client" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b74df13319e08bc386d333d3dc289c774c88cc543cae31f5347db07b5ec2172" +dependencies = [ + "bytes", + "chrono", + "futures-util", + "http", + "http-auth", + "jwt", + "lazy_static", + "oci-spec", + "olpc-cjson", + "regex", + "reqwest", + "serde", + "serde_json", + "sha2", + "thiserror 2.0.12", + "tokio", + "tracing", + "unicase", +] + +[[package]] +name = "oci-spec" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57e9beda9d92fac7bf4904c34c83340ef1024159faee67179a04e0277523da33" +dependencies = [ + "const_format", + "derive_builder", + "getset", + "regex", + "serde", + "serde_json", + "strum", + "strum_macros", + "thiserror 2.0.12", +] + +[[package]] +name = "olpc-cjson" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "696183c9b5fe81a7715d074fd632e8bd46f4ccc0231a3ed7fc580a80de5f7083" +dependencies = [ + "serde", + "serde_json", + "unicode-normalization", +] + [[package]] name = "once_cell" version = "1.21.3" @@ -1594,6 +1815,7 @@ dependencies = [ "hashbrown", "indexmap", "serde", + "serde_derive", ] [[package]] @@ -1644,6 +1866,28 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "proc-macro-error-attr2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96de42df36bb9bba5542fe9f1a054b8cc87e172759a1868aa05c1f3acc89dfc5" +dependencies = [ + "proc-macro2", + "quote", +] + +[[package]] +name = "proc-macro-error2" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11ec05c52be0a07b08061f7dd003e7d7092e0472bc731b4af7bb1ef876109802" +dependencies = [ + "proc-macro-error-attr2", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "proc-macro2" version = "1.0.95" @@ -1772,6 +2016,29 @@ dependencies = [ "smallvec", ] +[[package]] +name = "regex" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + [[package]] name = "regex-syntax" version = "0.8.5" @@ -1784,7 +2051,7 @@ version = "0.12.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d19c46a6fdd48bc4dab94b6103fccc55d34c67cc0ad04653aad4ea2a07cd7bbb" dependencies = [ - "base64", + "base64 0.22.1", "bytes", "encoding_rs", "futures-core", @@ -1813,11 +2080,13 @@ dependencies = [ "system-configuration", "tokio", "tokio-native-tls", + "tokio-util", "tower", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "windows-registry", ] @@ -1836,6 +2105,22 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "runtime" +version = "0.1.0" +dependencies = [ + "clap", + "file-source", + "petgraph", + "thiserror 2.0.12", + "tokio", + "tracing-subscriber", + "wasmtime", + "wasmtime-wasi", + "wasmtime-wasi-http", + "workflow", +] + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -2166,6 +2451,25 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "strum" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f64def088c51c9510a8579e3c5d67c65349dcf755e5479ad3d010aa6454e2c32" + +[[package]] +name = "strum_macros" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c77a8c5abcaf0f9ce05d62342b7d298c346515365c36b673df4ebe3ced01fde8" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "rustversion", + "syn", +] + [[package]] name = "subtle" version = "2.6.1" @@ -2328,6 +2632,21 @@ dependencies = [ "zerovec", ] +[[package]] +name = "tinyvec" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09b3661f17e86524eccd4371ab0429194e0d7c008abb45f7a7495b1719463c71" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + [[package]] name = "tokio" version = "1.45.0" @@ -2388,17 +2707,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-stream" -version = "0.1.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" -dependencies = [ - "futures-core", - "pin-project-lite", - "tokio", -] - [[package]] name = "tokio-util" version = "0.7.15" @@ -2486,6 +2794,7 @@ version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -2560,12 +2869,27 @@ version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f" +[[package]] +name = "unicase" +version = "2.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539" + [[package]] name = "unicode-ident" version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" +[[package]] +name = "unicode-normalization" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5033c97c4262335cded6d6fc3e5c18ab755e1a3dc96376350f3d8e9f009ad956" +dependencies = [ + "tinyvec", +] + [[package]] name = "unicode-width" version = "0.2.0" @@ -2757,6 +3081,19 @@ dependencies = [ "wasmparser 0.230.0", ] +[[package]] +name = "wasm-streams" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasm-wave" version = "0.228.0" @@ -2878,7 +3215,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aa71477c72baa24ae6ae64e7bca6831d3232b01fda24693311733f1e19136b68" dependencies = [ "anyhow", - "base64", + "base64 0.22.1", "directories-next", "log", "postcard", @@ -3559,6 +3896,19 @@ dependencies = [ "wast 35.0.2", ] +[[package]] +name = "workflow" +version = "0.1.0" +dependencies = [ + "clap", + "file-source", + "serde", + "serde_json", + "serde_yaml", + "thiserror 2.0.12", + "tokio", +] + [[package]] name = "writeable" version = "0.6.1" diff --git a/Cargo.toml b/Cargo.toml index d767148..6ae78a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,25 @@ [workspace] members = ["crates/*"] resolver = "2" + +[workspace.dependencies] +async-stream = "0.3.6" +clap = { version = "4.5", features = ["derive"] } +futures = "0.3" +oci-client = "0.15.0" +petgraph = { version = "0.8.1", features = ["serde-1"] } +regex = "1.10" +reqwest = { version = "0.12.15", features = ["json"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +serde_yaml = "0.9.34" +thiserror = "2.0.12" +tokio = { version = "1.0", features = ["full"] } +tokio-stream = "0.1.17" +tracing = "0.1.41" +tracing-subscriber = "0.3.19" +url = { version = "2.5.4", features = ["serde"] } +wasm-wave = "0.228.0" +wasmtime = { version = "32.0", features = ["wave"] } +wasmtime-wasi = "32.0" +wasmtime-wasi-http = "32.0" diff --git a/PHILOSOPHY.md b/PHILOSOPHY.md deleted file mode 100644 index f5d92f2..0000000 --- a/PHILOSOPHY.md +++ /dev/null @@ -1,13 +0,0 @@ -# 1. Screaming architecture - -Refuse names like controllers, models, views etc. The file names and structures should scream about their intent. If this is a tool about user management, you should have names like user, accesses, roles etc. - -# 2. Low abstractions - -An abstraction is a trait, interface, type, function and anything that hides logic or intention behind a name. There are only two good reasons for abstractions: domain representation (ie. structure or use-case) and DRY principle. - -# 3. Sort things alphabetically - -There are many ways to order things. Perhaps dates at the end, identifiers at the top, some tag somewhere around the top. But when the thing evolves, we end up pushing new things at the end. The initial well-though organization is lost and no-one understand it anymore. - -The only arbitrary way to organize thing unequivocally is alphabetical order. Is it the best order? No. Is it well understood by everyone and almost applies in almost 100% of situation without even having to think about it? Yes. diff --git a/crates/engine/Cargo.toml b/crates/engine/Cargo.toml deleted file mode 100644 index 87182da..0000000 --- a/crates/engine/Cargo.toml +++ /dev/null @@ -1,26 +0,0 @@ -[package] -name = "engine" -version = "0.1.0" -edition = "2021" - -[lib] -crate-type = ["rlib"] - -[dependencies] -async-stream = "0.3.6" -clap = { version = "4.5", features = ["derive"] } -petgraph = "0.8.1" -reqwest = "0.12.15" -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" -serde_yaml = "0.9.34" -thiserror = "2.0.12" -tokio = { version = "1.0", features = ["full"] } -tokio-stream = "0.1.17" -tracing = "0.1.41" -tracing-subscriber = "0.3.19" -url = { version = "2.5.4", features = ["serde"] } -wasm-wave = "0.228.0" -wasmtime = { version = "32.0", features = ["wave"] } -wasmtime-wasi = "32.0" -wasmtime-wasi-http = "32.0" diff --git a/crates/engine/src/engine.rs b/crates/engine/src/engine.rs deleted file mode 100644 index 38108e8..0000000 --- a/crates/engine/src/engine.rs +++ /dev/null @@ -1,125 +0,0 @@ -pub mod context; -pub mod function_name; -pub mod interface_name; -pub mod source; - -use std::collections::HashMap; - -use context::Context; -use function_name::FunctionName; -use interface_name::InterfaceName; -use source::Source; -use wasmtime::component::{Component, Func}; - -pub struct Engine { - pub context: Context, - pub store: HashMap, -} - -impl Engine { - pub fn new() -> Result { - Ok(Self { - context: Context::new()?, - store: HashMap::new(), - }) - } - - pub async fn load_component(&mut self, source: &Source) -> Result<(), Error> { - if self.store.get(source).cloned().is_none() { - let component_bytes = source.clone().load().await?; - let component = Component::new(&self.context.engine, &component_bytes)?; - self.store.insert(source.clone(), component.clone()); - } - Ok(()) - } - - pub async fn load_components(&mut self, sources: &[Source]) -> Result<(), Error> { - for source in sources { - self.load_component(source).await?; - } - Ok(()) - } - - pub fn get_component(&self, source: &Source) -> Result<&Component, Error> { - self.store - .get(source) - .ok_or(Error::ComponentNotFound(source.clone())) - } - - pub fn get_context_mut(&mut self) -> &mut Context { - &mut self.context - } - - pub async fn get_func( - &mut self, - source: Source, - function_name: FunctionName, - interface_name: Option, - ) -> Result { - let component = self.get_component(&source)?.clone(); - let context = self.get_context_mut(); - let instance = context - .linker - .instantiate_async(&mut context.store, &component) - .await?; - Ok(match interface_name { - None => instance - .get_func(&mut self.context.store, &function_name.0) - .ok_or(Error::FunctionNotFound( - function_name, - self.get_available_exports(&component), - ))?, - Some(interface_name) => { - let interface = instance - .get_export(&mut self.context.store, None, &interface_name.0) - .ok_or_else(|| { - Error::InterfaceExportNotFound( - interface_name.clone(), - self.get_available_exports(&component), - ) - })?; - let export = instance - .get_export(&mut self.context.store, Some(&interface), &function_name.0) - .ok_or_else(|| { - Error::InterfaceFunctionNotFound( - function_name.clone(), - interface_name.clone(), - self.get_available_exports(&component), - ) - })?; - instance.get_func(&mut self.context.store, export).ok_or( - Error::FunctionNotFound( - function_name.clone(), - self.get_available_exports(&component), - ), - )? - } - }) - } - - pub fn get_available_exports(&self, component: &Component) -> Vec { - component - .component_type() - .exports(&self.context.engine) - .map(|(name, _)| name.to_string()) - .collect() - } -} - -type AvailableExports = Vec; - -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("Component not found: {0}")] - ComponentNotFound(Source), - #[error("Function {0} not found. Available exports: {1:?}")] - FunctionNotFound(FunctionName, AvailableExports), - #[error("Interface export not found: {0}. Available exports: {1:?}")] - InterfaceExportNotFound(InterfaceName, AvailableExports), - #[error("Function {0} not found in interface {1}. Available exports: {2:?}")] - InterfaceFunctionNotFound(FunctionName, InterfaceName, AvailableExports), - #[error(transparent)] - Source(#[from] source::Error), - #[error("Wasmtime error: {0}")] - Wasmtime(#[from] wasmtime::Error), -} diff --git a/crates/engine/src/engine/function_name.rs b/crates/engine/src/engine/function_name.rs deleted file mode 100644 index 3c50dd1..0000000 --- a/crates/engine/src/engine/function_name.rs +++ /dev/null @@ -1,12 +0,0 @@ -use std::fmt; - -use serde::{Deserialize, Serialize}; - -#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)] -pub struct FunctionName(pub String); - -impl fmt::Display for FunctionName { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.0) - } -} diff --git a/crates/engine/src/engine/interface_name.rs b/crates/engine/src/engine/interface_name.rs deleted file mode 100644 index 084b567..0000000 --- a/crates/engine/src/engine/interface_name.rs +++ /dev/null @@ -1,12 +0,0 @@ -use std::fmt; - -use serde::{Deserialize, Serialize}; - -#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)] -pub struct InterfaceName(pub String); - -impl fmt::Display for InterfaceName { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.0) - } -} diff --git a/crates/engine/src/engine/source.rs b/crates/engine/src/engine/source.rs deleted file mode 100644 index 409207e..0000000 --- a/crates/engine/src/engine/source.rs +++ /dev/null @@ -1,92 +0,0 @@ -use std::{ - fmt::{self, Display}, - path::PathBuf, -}; - -use serde::{Deserialize, Serialize}; -use url::Url; - -#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)] -#[serde(untagged)] -pub enum Source { - Local(PathBuf), - Remote(Url), -} - -impl Source { - pub fn list_local_sources() -> Vec { - let allowed_root = Self::allowed_root(); - let mut sources = Vec::new(); - if let Ok(entries) = std::fs::read_dir(&allowed_root) { - for entry in entries.flatten() { - let path = entry.path(); - if path.is_file() { - sources.push(Source::Local(path)); - } - } - } - sources - } - - pub async fn load(self) -> Result, Error> { - let allowed_root = Self::allowed_root(); - match self { - Self::Local(path) => { - let path = path.canonicalize()?; - if !path.starts_with(&allowed_root) { - return Err(Error::ForbiddenPath(path)); - } - Ok(std::fs::read(path)?) - } - Self::Remote(url) => { - if url.scheme() != "https" { - return Err(Error::ForbiddenScheme(url.to_string())); - } - tracing::info!("Downloading component from URL: {}", url); - let response = reqwest::get(url).await?; - tracing::info!("Response status: {}", response.status()); - let bytes = response.bytes().await?; - tracing::info!("Downloaded {} bytes", bytes.len()); - Ok(bytes.to_vec()) - } - } - } - - fn allowed_root() -> PathBuf { - if let Ok(wasm_path) = std::env::var("WASM_PATH") { - if let Ok(path) = PathBuf::from(wasm_path).canonicalize() { - return path; - } - } - - if let Ok(cwd) = std::env::current_dir() { - let wasm_dir = cwd.join("wasm"); - if let Ok(path) = wasm_dir.canonicalize() { - return path; - } - } - - PathBuf::from("./wasm") - } -} - -impl Display for Source { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Local(path) => write!(f, "{}", path.display()), - Self::Remote(url) => write!(f, "{url}"), - } - } -} - -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("Invalid path: {0}")] - ForbiddenPath(PathBuf), - #[error("Forbidden scheme (only https is allowed): {0}")] - ForbiddenScheme(String), - #[error("HTTP error: {0}")] - Http(#[from] reqwest::Error), - #[error("IO error: {0}")] - Io(#[from] std::io::Error), -} diff --git a/crates/engine/src/executor.rs b/crates/engine/src/executor.rs deleted file mode 100644 index 7a57892..0000000 --- a/crates/engine/src/executor.rs +++ /dev/null @@ -1,120 +0,0 @@ -use std::collections::HashMap; - -use petgraph::{algo::toposort, graph::NodeIndex, visit::EdgeRef, Graph}; -use tokio_stream::Stream; -use wasmtime::component::Val; - -use crate::{ - engine::{function_name::FunctionName, Engine}, - metadata::InputName, - workflow::{ - node::{Data, Node, NodeId}, - Workflow, - }, -}; - -pub struct Executor { - graph: Graph, - nodes: Vec, -} - -impl Executor { - pub fn new(workflow: &Workflow) -> Result { - let mut graph = Graph::new(); - let mut node_indices = HashMap::new(); - - // Add nodes and store their indices - for (id, node) in &workflow.nodes { - let node_index = graph.add_node(node.clone()); - node_indices.insert(id.clone(), node_index); - } - - // Add edges using the stored indices - for edge in &workflow.edges { - if let (Some(source_idx), Some(target_idx)) = ( - node_indices.get(&edge.source), - node_indices.get(&edge.target), - ) { - graph.add_edge(*source_idx, *target_idx, 0); - } - } - - let nodes = toposort(&graph, None).map_err(|e| Error::CyclicGraph(e.node_id().index()))?; - Ok(Self { graph, nodes }) - } - - pub fn run<'a>( - self, - engine: &'a mut Engine, - ) -> impl Stream, Error>)> + 'a { - let mut node_outputs: HashMap> = HashMap::new(); - - async_stream::stream! { - for node_idx in &self.nodes { - let node = self.graph.node_weight(*node_idx).unwrap(); - let Data { function, source, interface, inputs } = &node.data; - let result = async { - let values = { - let func = &engine.get_func( - source.clone(), - function.clone(), - interface.clone(), - ).await?; - let mut input_values: HashMap = HashMap::new(); - - for edge in self.graph.edges_directed(*node_idx, petgraph::Incoming) { - let parent_idx = edge.source(); - let parent = self.graph.node_weight(parent_idx).unwrap(); - let edge_index = *edge.weight(); - if let Some(parent_outputs) = node_outputs.get(&parent.id) { - if let Some(output) = parent_outputs.first() { - input_values.insert(edge_index, output.clone()); - } - } - } - - let params = func - .params(&mut engine.context.store) - .iter() - .enumerate() - .map(|(i, (name, ty))| -> Result { - match input_values.get(&i) { - Some(val) => Ok(val.clone()), - None => { - if let Some(input) = inputs.get(name) { - Ok(Val::from_wave(ty, input)?) - } else { - Err(Error::MissingParameter(function.clone(), InputName(name.to_string()))) - } - } - } - }) - .collect::, Error>>()?; - let mut output = vec![Val::S32(0)]; - func.call_async(&mut engine.context.store, ¶ms, &mut output) - .await?; - output.to_vec() - }; - tracing::info!("executed node {:?} with output {:?}", node, values.clone()); - node_outputs.insert(node.id.clone(), values.clone()); - Ok::, Error>(values.iter().map(|v| v.to_wave().unwrap()).collect()) - }.await; - yield (node.id.clone(), result); - } - } - } -} - -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("Cyclic graph detected at index {0}")] - CyclicGraph(usize), - #[error(transparent)] - Engine(#[from] crate::engine::Error), - #[error("Missing parameter: {0} for node {1}")] - MissingParameter(FunctionName, InputName), - #[error("Wasmtime error: {0}")] - Wasmtime(#[from] wasmtime::Error), - #[error("WasmWave error: {0}")] - WasmWave(#[from] wasm_wave::parser::ParserError), -} diff --git a/crates/engine/src/lib.rs b/crates/engine/src/lib.rs deleted file mode 100644 index 87f822e..0000000 --- a/crates/engine/src/lib.rs +++ /dev/null @@ -1,6 +0,0 @@ -mod engine; -pub mod executor; -pub mod metadata; -pub mod workflow; - -pub use engine::*; diff --git a/crates/engine/src/main.rs b/crates/engine/src/main.rs deleted file mode 100644 index cfb9627..0000000 --- a/crates/engine/src/main.rs +++ /dev/null @@ -1,86 +0,0 @@ -use std::{path::PathBuf, pin::pin}; - -use clap::{Parser, Subcommand}; -use engine::{executor::Executor, metadata::Metadata, workflow::Workflow, Engine}; -use tokio_stream::StreamExt; - -/// A CLI tool for executing workflows -#[derive(Debug, Parser)] -#[command(author, version, about, long_about = None)] -struct Args { - #[command(subcommand)] - command: Commands, -} - -#[derive(Debug, Subcommand)] -enum Commands { - Parse { - /// Path to the workflow manifest file - #[arg(short, long)] - workflow: PathBuf, - }, - Run { - /// Path to the workflow manifest file - #[arg(short, long)] - workflow: PathBuf, - }, -} - -#[tokio::main] -async fn main() -> Result<(), Error> { - tracing_subscriber::fmt::init(); - let args = Args::parse(); - - match args.command { - Commands::Parse { workflow } => { - let workflow = load_workflow(&workflow)?; - let mut engine = Engine::new()?; - let metadata = Metadata::new(&mut engine, workflow.sources()).await; - println!("{}", serde_json::to_string_pretty(&metadata)?); - } - Commands::Run { workflow } => { - let workflow = load_workflow(&workflow)?; - let mut engine = Engine::new()?; - engine.load_components(&workflow.sources()).await?; - let executor = Executor::new(&workflow)?; - let stream = executor.run(&mut engine); - let mut stream = pin!(stream); - while let Some((node_id, result)) = stream.next().await { - match result { - Ok(outputs) => { - println!("Node {node_id:?} output: {outputs:?}"); - } - Err(e) => { - println!("Node {node_id:?} error: {e:?}"); - } - } - } - } - } - - Ok(()) -} - -fn load_workflow(path: &PathBuf) -> Result { - let source = std::fs::read_to_string(path)?; - let is_json = path.extension().is_some_and(|ext| ext == "json"); - let workflow: Workflow = match is_json { - true => serde_json::from_str(&source)?, - false => serde_yaml::from_str(&source)?, - }; - Ok(workflow) -} - -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error(transparent)] - Engine(#[from] engine::Error), - #[error(transparent)] - Executor(#[from] engine::executor::Error), - #[error(transparent)] - File(#[from] std::io::Error), - #[error(transparent)] - Json(#[from] serde_json::Error), - #[error(transparent)] - Yaml(#[from] serde_yaml::Error), -} diff --git a/crates/engine/src/metadata.rs b/crates/engine/src/metadata.rs deleted file mode 100644 index 00b9f49..0000000 --- a/crates/engine/src/metadata.rs +++ /dev/null @@ -1,71 +0,0 @@ -mod to_json_schema; - -use std::{collections::HashMap, fmt}; - -use serde::{Deserialize, Serialize}; -use wasmtime::component::types::ComponentItem; - -use crate::{ - engine::{function_name::FunctionName, source::Source, Engine}, - metadata::to_json_schema::ToJsonSchema, -}; - -#[derive(Clone, Debug, Default, Deserialize, Serialize)] -pub struct Metadata { - pub sources: HashMap>, -} - -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct Func { - pub inputs: HashMap, - pub name: FunctionName, -} - -#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)] -pub struct InputName(pub String); - -impl fmt::Display for InputName { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.0) - } -} - -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct Input { - pub name: InputName, - pub schema: String, -} - -impl Metadata { - pub async fn new(engine: &mut Engine, sources: Vec) -> Self { - let mut metadata: Metadata = Default::default(); - engine.load_components(&sources).await.unwrap(); - for source in sources { - let component = engine.get_component(&source).unwrap(); - let mut functions = HashMap::new(); - for (func_name, func) in component.component_type().exports(&engine.context.engine) { - println!("func: {func:?}"); - if let ComponentItem::ComponentFunc(func) = func { - let function_name = FunctionName(func_name.to_string()); - let mut function = Func { - name: function_name.clone(), - inputs: HashMap::new(), - }; - for (name, ty) in func.params() { - let input_name = InputName(name.to_string()); - function.inputs.insert( - input_name.clone(), - Input { - name: input_name.clone(), - schema: serde_json::to_string(&ty.to_json_schema()).unwrap(), - }, - ); - } - functions.insert(function_name, function); - } - } - metadata.sources.insert(source, functions); - } - metadata - } -} diff --git a/crates/engine/src/metadata/to_json_schema.rs b/crates/engine/src/metadata/to_json_schema.rs deleted file mode 100644 index b99541a..0000000 --- a/crates/engine/src/metadata/to_json_schema.rs +++ /dev/null @@ -1,109 +0,0 @@ -use serde_json::{json, Value}; -use wasmtime::component::Type; - -pub trait ToJsonSchema { - fn to_json_schema(&self) -> Value; -} - -impl ToJsonSchema for Type { - fn to_json_schema(&self) -> Value { - match &self { - Type::Bool => json!({ "type": "boolean" }), - Type::Borrow(_) => todo!(), - Type::Char | Type::String => json!({ "type": "string" }), - Type::Enum(enum_) => { - let values = enum_.names().map(|c| c.to_string()).collect::>(); - json!({ - "type": "string", - "enum": values - }) - } - Type::Flags(flags) => { - let properties = flags - .names() - .map(|f| (f.to_string(), json!({ "type": "boolean" }))) - .collect::>(); - json!({ - "type": "object", - "properties": properties - }) - } - Type::Float32 | Type::Float64 => json!({ "type": "number" }), - Type::List(list) => { - let items = ToJsonSchema::to_json_schema(&list.ty()); - json!({ - "type": "array", - "items": items - }) - } - Type::Option(opt) => { - let inner = ToJsonSchema::to_json_schema(&opt.ty()); - json!({ - "anyOf": [ - { "type": "null" }, - inner - ] - }) - } - Type::Own(_) => todo!(), - Type::Record(record) => { - let properties = record - .fields() - .map(|f| (f.name.to_string(), ToJsonSchema::to_json_schema(&f.ty))) - .collect::>(); - json!({ - "type": "object", - "properties": properties - }) - } - Type::Result(result) => { - let ok = result - .ok() - .map(|ok| ToJsonSchema::to_json_schema(&ok)) - .unwrap_or_else(|| json!(null)); - let err = result - .err() - .map(|err| ToJsonSchema::to_json_schema(&err)) - .unwrap_or_else(|| json!(null)); - json!({ - "type": "object", - "properties": { - "Ok": ok, - "Err": err - } - }) - } - Type::S8 | Type::S16 | Type::S32 | Type::S64 => { - json!({ "type": "integer", "format": "int" }) - } - Type::Tuple(tuple) => { - let items = tuple - .types() - .map(|t| ToJsonSchema::to_json_schema(&t)) - .collect::>(); - json!({ - "type": "array", - "prefixItems": items - }) - } - Type::U8 | Type::U16 | Type::U32 | Type::U64 => { - json!({ "type": "integer", "format": "uint" }) - } - Type::Variant(variant) => { - let one_of = variant - .cases() - .map(|c| { - let ty = ToJsonSchema::to_json_schema(&c.ty.unwrap()); - json!({ - "title": c.name.to_string(), - "allOf": [ty] - }) - }) - .collect::>(); - json!({ - "oneOf": one_of - }) - } - } - } -} diff --git a/crates/engine/src/workflow.rs b/crates/engine/src/workflow.rs deleted file mode 100644 index 86fa92e..0000000 --- a/crates/engine/src/workflow.rs +++ /dev/null @@ -1,37 +0,0 @@ -pub mod edge; -pub mod node; - -use std::collections::HashMap; - -use edge::Edge; -use serde::{Deserialize, Serialize}; - -use self::node::{Node, NodeId}; -use crate::engine::source::Source; - -#[derive(Clone, Debug, Default, Deserialize, Serialize)] -pub struct Workflow { - pub edges: Vec, - pub nodes: HashMap, -} - -impl Workflow { - pub fn sources(&self) -> Vec { - let mut sources = vec![]; - for node in self.nodes.values() { - sources.push(node.data.source.clone()); - } - sources - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_deserialize_workflow() { - let json = include_str!("../examples/hello-world.json"); - let _: Workflow = serde_json::from_str(json).unwrap(); - } -} diff --git a/crates/engine/src/workflow/node.rs b/crates/engine/src/workflow/node.rs deleted file mode 100644 index 386ca28..0000000 --- a/crates/engine/src/workflow/node.rs +++ /dev/null @@ -1,37 +0,0 @@ -use std::{collections::HashMap, fmt::Display}; - -use serde::{Deserialize, Serialize}; - -use crate::engine::{function_name::FunctionName, interface_name::InterfaceName, source::Source}; - -#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)] -pub struct NodeId(pub String); - -impl Display for NodeId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.0) - } -} - -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct Node { - pub data: Data, - pub id: NodeId, - pub position: Position, - pub r#type: String, -} - -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct Data { - pub function: FunctionName, - #[serde(default)] - pub inputs: HashMap, - pub interface: Option, - pub source: Source, -} - -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct Position { - pub x: f32, - pub y: f32, -} diff --git a/crates/file-source/Cargo.toml b/crates/file-source/Cargo.toml new file mode 100644 index 0000000..9baad4e --- /dev/null +++ b/crates/file-source/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "file-source" +version = "0.1.0" +edition = "2024" + +[dependencies] +regex.workspace = true +reqwest.workspace = true +serde.workspace = true +thiserror.workspace = true +oci-client.workspace = true +url.workspace = true + +[target.'cfg(all())'.dependencies] +clap.workspace = true +tokio.workspace = true diff --git a/crates/file-source/README.md b/crates/file-source/README.md new file mode 100644 index 0000000..5cc5a1e --- /dev/null +++ b/crates/file-source/README.md @@ -0,0 +1 @@ +Load bytes from a file source which could be remote, local or OCI. diff --git a/crates/file-source/src/file_source.rs b/crates/file-source/src/file_source.rs new file mode 100644 index 0000000..3575a3d --- /dev/null +++ b/crates/file-source/src/file_source.rs @@ -0,0 +1,111 @@ +mod local; +mod oci; +mod remote; + +use std::path::PathBuf; + +pub use local::LocalFileSource; +pub use oci::OciFileSource; +pub use remote::RemoteFileSource; +use serde::{Deserialize, Serialize}; +use thiserror::Error; +use url::Url; + +#[derive(Clone, Debug)] +pub enum FileSource { + Local(LocalFileSource), + Remote(RemoteFileSource), + Oci(OciFileSource), +} + +impl FileSource { + pub fn local(path: PathBuf) -> Self { + Self::Local(LocalFileSource::new(path)) + } + + pub fn remote(url: String) -> Self { + Self::Remote(RemoteFileSource::new(url)) + } + + pub fn oci(url: String) -> Self { + Self::Oci(OciFileSource::new(url)) + } + + pub fn parse(reference: &str) -> Result { + if let Ok(url) = Url::parse(reference) { + match url.scheme() { + "https" | "http" => return Ok(Self::remote(reference.to_string())), + "oci" => return Ok(Self::oci(url.path().to_string())), + _ => {} + } + } + + let path = PathBuf::from(reference); + if path.starts_with("./") || path.starts_with("../") || path.starts_with("/") { + return Ok(Self::local(path)); + } + + if Self::is_oci_image_ref(reference) { + return Ok(Self::oci(reference.to_string())); + } + + Err(Error::Parse(format!( + "Unable to identify reference type: {}", + reference + ))) + } + + fn is_oci_image_ref(reference: &str) -> bool { + let pattern = + regex::Regex::new(r"^[\w\.-]+(?:/[\w\.-]+)+(?::[\w\.-]+)?(?:@sha256:[a-fA-F0-9]+)?$") + .unwrap(); + pattern.is_match(reference) + } + + pub async fn load(&self) -> Result, Error> { + Ok(match self { + FileSource::Local(source) => source.load().await?, + FileSource::Remote(source) => source.load().await?, + FileSource::Oci(source) => source.load().await?, + }) + } + + fn as_str(&self) -> String { + match self { + FileSource::Local(source) => source.path().to_string_lossy().into_owned(), + FileSource::Remote(source) => source.url().to_string(), + FileSource::Oci(source) => source.url().to_string(), + } + } +} + +impl<'de> Deserialize<'de> for FileSource { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + Self::parse(&s).map_err(serde::de::Error::custom) + } +} + +impl Serialize for FileSource { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_str(&self.as_str()) + } +} + +#[derive(Debug, Error)] +pub enum Error { + #[error("Failed to read local file: {0}")] + Local(#[from] local::Error), + #[error("Failed to pull OCI artifact: {0}")] + Oci(#[from] oci::Error), + #[error("Failed to read remote file: {0}")] + Remote(#[from] remote::Error), + #[error("Failed to parse file source: {0}")] + Parse(String), +} diff --git a/crates/file-source/src/file_source/local.rs b/crates/file-source/src/file_source/local.rs new file mode 100644 index 0000000..096c721 --- /dev/null +++ b/crates/file-source/src/file_source/local.rs @@ -0,0 +1,27 @@ +use std::path::PathBuf; + +use serde::{Deserialize, Serialize}; +use tokio::fs; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct LocalFileSource(PathBuf); + +impl LocalFileSource { + pub fn new(path: PathBuf) -> Self { + Self(path) + } + + pub fn path(&self) -> &PathBuf { + &self.0 + } + + pub async fn load(&self) -> Result, Error> { + Ok(fs::read(&self.0).await?) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Failed to read local file: {0}")] + Io(#[from] std::io::Error), +} diff --git a/crates/file-source/src/file_source/oci.rs b/crates/file-source/src/file_source/oci.rs new file mode 100644 index 0000000..437a1ad --- /dev/null +++ b/crates/file-source/src/file_source/oci.rs @@ -0,0 +1,33 @@ +use oci_client::{Client, Reference, secrets::RegistryAuth}; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct OciFileSource(String); + +impl OciFileSource { + pub fn new(url: String) -> Self { + Self(url) + } + + pub fn url(&self) -> &str { + &self.0 + } + + pub async fn load(&self) -> Result, Error> { + let client = Client::default(); + let reference: Reference = self.0.parse()?; + let auth = RegistryAuth::Anonymous; + let image_data = client + .pull(&reference, &auth, vec!["application/wasm"]) + .await?; + Ok(image_data.layers[0].data.clone()) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Failed to pull OCI artifact: {0}")] + OciDistribution(#[from] oci_client::errors::OciDistributionError), + #[error("Failed to parse OCI reference: {0}")] + Parse(#[from] oci_client::ParseError), +} diff --git a/crates/file-source/src/file_source/remote.rs b/crates/file-source/src/file_source/remote.rs new file mode 100644 index 0000000..1de67bd --- /dev/null +++ b/crates/file-source/src/file_source/remote.rs @@ -0,0 +1,28 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct RemoteFileSource(String); + +impl RemoteFileSource { + pub fn new(url: String) -> Self { + Self(url) + } + + pub fn url(&self) -> &str { + &self.0 + } + + pub async fn load(&self) -> Result, Error> { + Ok(reqwest::get(&self.0) + .await? + .bytes() + .await + .map(|b| b.to_vec())?) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Failed to read remote file: {0}")] + Reqwest(#[from] reqwest::Error), +} diff --git a/crates/file-source/src/lib.rs b/crates/file-source/src/lib.rs new file mode 100644 index 0000000..932a678 --- /dev/null +++ b/crates/file-source/src/lib.rs @@ -0,0 +1,3 @@ +mod file_source; + +pub use file_source::*; diff --git a/crates/file-source/src/main.rs b/crates/file-source/src/main.rs new file mode 100644 index 0000000..a433a10 --- /dev/null +++ b/crates/file-source/src/main.rs @@ -0,0 +1,19 @@ +use clap::Parser; +use file_source::FileSource; + +#[derive(Parser)] +#[command(author, version, about, long_about = None)] +struct Cli { + source: String, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let cli = Cli::parse(); + + let source = FileSource::parse(&cli.source)?; + let content = source.load().await?; + println!("File size: {} bytes", content.len()); + + Ok(()) +} diff --git a/crates/runtime/Cargo.toml b/crates/runtime/Cargo.toml new file mode 100644 index 0000000..cc6ddf2 --- /dev/null +++ b/crates/runtime/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "runtime" +version = "0.1.0" +edition = "2024" + +[lib] +crate-type = ["rlib"] + +[dependencies] +file-source = { path = "../file-source" } +petgraph.workspace = true +thiserror.workspace = true +tracing-subscriber.workspace = true +# tracing.workspace = true +wasmtime-wasi-http.workspace = true +wasmtime-wasi.workspace = true +wasmtime.workspace = true +workflow = { path = "../workflow" } + +[target.'cfg(all())'.dependencies] +clap.workspace = true +tokio.workspace = true diff --git a/crates/engine/README.md b/crates/runtime/README.md similarity index 100% rename from crates/engine/README.md rename to crates/runtime/README.md diff --git a/crates/engine/examples/hello-world.json b/crates/runtime/examples/hello-world.json similarity index 100% rename from crates/engine/examples/hello-world.json rename to crates/runtime/examples/hello-world.json diff --git a/crates/engine/examples/http.json b/crates/runtime/examples/http.json similarity index 100% rename from crates/engine/examples/http.json rename to crates/runtime/examples/http.json diff --git a/crates/engine/src/engine/context.rs b/crates/runtime/src/context.rs similarity index 65% rename from crates/engine/src/engine/context.rs rename to crates/runtime/src/context.rs index b75d974..b017afe 100644 --- a/crates/engine/src/engine/context.rs +++ b/crates/runtime/src/context.rs @@ -1,6 +1,6 @@ use wasmtime::{ - component::{Linker, ResourceTable}, - Config, Engine, Result, Store, + Config, Engine, Store, + component::{Component, Func, Linker, ResourceTable}, }; use wasmtime_wasi::{IoView, WasiCtx, WasiCtxBuilder, WasiView}; use wasmtime_wasi_http::{WasiHttpCtx, WasiHttpView}; @@ -12,7 +12,7 @@ pub struct Context { } impl Context { - pub fn new() -> Result { + pub fn new() -> Result { let mut config = Config::new(); config.async_support(true); let engine = Engine::new(&config)?; @@ -38,6 +38,16 @@ impl Context { store, }) } + + pub async fn get_func(&mut self, component: &Component, name: &str) -> Result { + let instance = self + .linker + .instantiate_async(&mut self.store, component) + .await?; + instance + .get_func(&mut self.store, name) + .ok_or_else(|| Error::FunctionNotFound(name.to_string())) + } } pub struct State { @@ -63,3 +73,11 @@ impl WasiHttpView for State { &mut self.http } } + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Function not found: {0}")] + FunctionNotFound(String), + #[error("Wasmtime error: {0}")] + Wasmtime(#[from] wasmtime::Error), +} diff --git a/crates/runtime/src/lib.rs b/crates/runtime/src/lib.rs new file mode 100644 index 0000000..4b6cb28 --- /dev/null +++ b/crates/runtime/src/lib.rs @@ -0,0 +1,6 @@ +pub mod prototype; +mod runtime; +mod state; +pub mod task; + +pub use runtime::*; diff --git a/crates/runtime/src/main.rs b/crates/runtime/src/main.rs new file mode 100644 index 0000000..e99be1d --- /dev/null +++ b/crates/runtime/src/main.rs @@ -0,0 +1,86 @@ +use std::path::PathBuf; + +use clap::{Parser, Subcommand}; +use runtime::{ + Runtime, + prototype::Prototype, + task::{Event, Task}, +}; +use workflow::Workflow; + +/// A CLI tool for executing workflows +#[derive(Debug, Parser)] +#[command(author, version, about, long_about = None)] +struct Args { + #[command(subcommand)] + command: Commands, +} + +#[derive(Debug, Subcommand)] +enum Commands { + Parse { + /// Path to the workflow manifest file + #[arg(short, long)] + workflow: PathBuf, + }, + Run { + /// Path to the workflow manifest file + #[arg(short, long)] + workflow: PathBuf, + }, +} + +impl Commands { + fn workflow(&self) -> &PathBuf { + match self { + Commands::Parse { workflow } => workflow, + Commands::Run { workflow } => workflow, + } + } +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + tracing_subscriber::fmt::init(); + let args = Args::parse(); + let mut runtime = Runtime::new()?; + let workflow = Workflow::load(args.command.workflow())?; + let prototype = Prototype::new(&mut runtime, &workflow).await?; + + if let Commands::Run { .. } = args.command { + let mut task = Task::new(&mut runtime, &prototype).await?; + let mut subscribe = task.subscribe(); + + tokio::spawn(async move { + task.run().await; + }); + + while let Ok(event) = subscribe.recv().await { + match event { + Event::ExecutionStarted(node_id, params) => { + println!("{node_id} started with params: {params:?}") + } + Event::ExecutionSucceeded(node_id, output) => { + println!("{node_id} succeeded with output: {output:?}") + } + Event::ExecutionFailed(node_id, error) => { + eprintln!("{node_id} failed with error: {error:?}") + } + } + } + } + + Ok(()) +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error(transparent)] + Prototype(#[from] runtime::prototype::Error), + #[error(transparent)] + Runtime(#[from] runtime::Error), + #[error(transparent)] + Task(#[from] runtime::task::Error), + #[error(transparent)] + Workflow(#[from] workflow::Error), +} diff --git a/crates/runtime/src/prototype.rs b/crates/runtime/src/prototype.rs new file mode 100644 index 0000000..c5ce60f --- /dev/null +++ b/crates/runtime/src/prototype.rs @@ -0,0 +1,146 @@ +use std::collections::HashMap; + +use petgraph::{Graph, acyclic::Acyclic, graph::NodeIndex}; +use wasmtime::component::{Component, ComponentExportIndex, Val, types::ComponentItem}; +use workflow::{ComponentName, Edge, FunctionName, InputName, Node, NodeId, Workflow}; + +use crate::runtime::Runtime; + +/// A `Prototype` represents a compiled, static workflow definition. +/// +/// It holds: +/// - A list of compiled WebAssembly `Component`. +/// - A graph of nodes (functions and values) and edges (inputs) representing the workflow. +/// +/// `Prototype` instances are created once and can be executed many times +/// by spawning new `Task` instances. Compilation is cached by the +/// `Program`'s `Engine`, making `Prototype` instantiation cheap. +pub struct Prototype { + pub(crate) components: HashMap, + pub(crate) graph: Acyclic>, +} + +impl Prototype { + pub async fn new(runtime: &mut Runtime, workflow: &Workflow) -> Result { + // Compiled components + let mut components = HashMap::new(); + + // Graph of nodes and edges + let mut graph = Graph::new(); + let mut node_indices = HashMap::new(); + + for (node_id, node) in &workflow.nodes { + // First, we get the component or we load it + let component = match components.get(&node.r#use) { + Some(component) => component, + None => { + let file_source = workflow + .dependencies + .get(&node.r#use) + .ok_or(Error::DependencyNotFound(node.r#use.clone()))?; + let bytes = file_source.load().await?; + let component = Component::from_binary(&runtime.engine, &bytes)?; + components.insert(node.r#use.clone(), component); + components.get(&node.r#use).unwrap() + } + }; + + // Then, we lookup the corresponding function + let (item, index) = component.export_index(None, &node.run).ok_or({ + Error::MissingFunction( + node.run.clone(), + component + .component_type() + .exports(&runtime.engine) + .map(|(name, _)| name.to_string()) + .collect(), + ) + })?; + + if let ComponentItem::ComponentFunc(func) = item { + // We add the node's function to the graph + let node_index = graph.add_node(NodeType::Function(Function { + component_name: node.r#use.clone(), + index, + node_id: node_id.clone(), + val: None, + })); + node_indices.insert(node_id, node_index); + + // We add the node's inputs to the graph + for (name, ty) in func.params() { + let input_name = InputName(name.to_string()); + if let Some(value) = node.with.get(&input_name) { + let val = Val::from_wave(&ty, value)?; + let input_index = graph.add_node(NodeType::Value(val.clone())); + graph.add_edge(node_index, input_index, input_name.clone()); + } else { + Err(Error::MissingInput(input_name.clone()))?; + } + } + } else { + Err(Error::InvalidNode(node.clone()))?; + } + } + + // Finally, we connect the nodes' function to each other + for edge in &workflow.edges { + if let (Some(source_idx), Some(target_idx)) = ( + node_indices.get(&edge.source), + node_indices.get(&edge.target), + ) { + graph.add_edge(*source_idx, *target_idx, edge.input.clone()); + } else { + Err(Error::InvalidEdge(edge.clone()))?; + } + } + + let graph = + Acyclic::try_from_graph(graph).map_err(|cycle| Error::Cycle(cycle.node_id()))?; + + Ok(Self { components, graph }) + } +} + +#[derive(Clone, Debug)] +pub enum NodeType { + Function(Function), + Value(Val), +} + +impl NodeType { + pub fn set_val(&mut self, val: Val) { + match self { + NodeType::Function(function) => function.val = Some(val), + NodeType::Value(val) => *val = val.clone(), + } + } +} + +#[derive(Clone, Debug)] +pub struct Function { + pub(crate) component_name: ComponentName, + pub(crate) index: ComponentExportIndex, + pub(crate) node_id: NodeId, + pub(crate) val: Option, +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Cycle detected: {0:?}")] + Cycle(NodeIndex), + #[error("Dependency not found: {0:?}")] + DependencyNotFound(ComponentName), + #[error("File source error: {0}")] + FileSource(#[from] file_source::Error), + #[error("Invalid edge: {0:?}")] + InvalidEdge(Edge), + #[error("Invalid node: {0:?}")] + InvalidNode(Node), + #[error("Missing function: {0:?}, available: {1:?}")] + MissingFunction(FunctionName, Vec), + #[error("Missing input: {0:?}")] + MissingInput(InputName), + #[error("Wasmtime error: {0}")] + Wasmtime(#[from] wasmtime::Error), +} diff --git a/crates/runtime/src/runtime.rs b/crates/runtime/src/runtime.rs new file mode 100644 index 0000000..e17b4c0 --- /dev/null +++ b/crates/runtime/src/runtime.rs @@ -0,0 +1,30 @@ +pub use wasmtime::Error; +use wasmtime::{Config, Engine, component::Linker}; + +use crate::state::State; + +/// The `Runtime` owns the global execution context for workflows. +/// +/// It holds: +/// - A single `Engine`, which caches compiled modules and components. +/// - A shared `Linker`, which registers host functions, capabilities, and +/// shared components available to all workflows. +/// +/// The `Runtime` can compile multiple `Prototype` instances (static workflows) +/// and spawn multiple independent `Task` executions from them. +pub struct Runtime { + pub engine: Engine, + pub linker: Linker, +} + +impl Runtime { + pub fn new() -> Result { + let mut config = Config::new(); + config.async_support(true); + let engine = Engine::new(&config)?; + let mut linker = Linker::::new(&engine); + wasmtime_wasi::add_to_linker_async(&mut linker)?; + wasmtime_wasi_http::add_only_http_to_linker_async(&mut linker)?; + Ok(Self { engine, linker }) + } +} diff --git a/crates/runtime/src/state.rs b/crates/runtime/src/state.rs new file mode 100644 index 0000000..145ceac --- /dev/null +++ b/crates/runtime/src/state.rs @@ -0,0 +1,43 @@ +use wasmtime::component::ResourceTable; +use wasmtime_wasi::{IoView, WasiCtx, WasiCtxBuilder, WasiView}; +use wasmtime_wasi_http::{WasiHttpCtx, WasiHttpView}; + +pub struct State { + ctx: WasiCtx, + table: ResourceTable, + http: WasiHttpCtx, +} + +impl Default for State { + fn default() -> Self { + Self::new() + } +} + +impl State { + pub fn new() -> Self { + Self { + ctx: WasiCtxBuilder::new().build(), + table: ResourceTable::new(), + http: WasiHttpCtx::new(), + } + } +} + +impl IoView for State { + fn table(&mut self) -> &mut ResourceTable { + &mut self.table + } +} + +impl WasiView for State { + fn ctx(&mut self) -> &mut WasiCtx { + &mut self.ctx + } +} + +impl WasiHttpView for State { + fn ctx(&mut self) -> &mut WasiHttpCtx { + &mut self.http + } +} diff --git a/crates/runtime/src/task.rs b/crates/runtime/src/task.rs new file mode 100644 index 0000000..b0e53e1 --- /dev/null +++ b/crates/runtime/src/task.rs @@ -0,0 +1,146 @@ +use std::collections::HashMap; + +use petgraph::{Graph, algo::toposort, graph::NodeIndex, visit::EdgeRef}; +use tokio::sync::broadcast::{Receiver, Sender, channel}; +use wasmtime::{ + Result, Store, + component::{Instance, Val}, +}; +use workflow::{ComponentName, InputName, NodeId}; + +use crate::{ + prototype::{NodeType, Prototype}, + runtime::Runtime, + state::State, +}; + +/// A `Task` represents a single, isolated execution of a workflow prototype. +/// +/// It holds: +/// - A new `Store`, providing isolated memory, globals, tables, and state. +/// - An `Instance` of a `Prototype` within that store. +/// +/// Each `Task` runs independently from others, even if derived from the same +/// `Prototype`. They may share capabilities via the `Program`'s `Linker`, but +/// do not share memory or instances directly. Any shared state must be managed +/// externally via host functions or global services. +pub struct Task { + sender: Sender, + graph: Graph, + instances: HashMap, + store: Store, +} + +impl Task { + pub async fn new(runtime: &mut Runtime, prototype: &Prototype) -> Result { + let mut store = Store::new(&runtime.engine, State::new()); + + let mut instances = HashMap::new(); + for (component_name, component) in &prototype.components { + let instance = runtime + .linker + .instantiate_async(&mut store, component) + .await?; + instances.insert(component_name.clone(), instance); + } + + let (sender, _) = channel(32); + + Ok(Self { + sender, + graph: prototype.graph.inner().clone(), + instances, + store, + }) + } + + pub fn subscribe(&self) -> Receiver { + self.sender.subscribe() + } + + pub async fn run(&mut self) { + let nodes = self.prepare(); + for (node_index, inputs_index) in nodes { + let node = self.graph.node_weight(node_index).unwrap(); + let mut val = None; + if let NodeType::Function(function) = node { + if function.val.is_none() { + let instance = self.instances.get(&function.component_name).unwrap(); + let func = instance.get_func(&mut self.store, function.index).unwrap(); + + let mut params = Vec::new(); + for (name, _) in func.params(&self.store) { + let input_index = *inputs_index.get(&InputName(name.to_string())).unwrap(); + match self.graph.node_weight(input_index).unwrap() { + NodeType::Value(val) => params.push(val.clone()), + NodeType::Function(function) => { + params.push(function.val.clone().unwrap()) + } + } + } + + self.sender + .send(Event::ExecutionStarted( + function.node_id.clone(), + params.clone(), + )) + .unwrap(); + + let results = func.results(&self.store).len(); + + // We need to set a default value for the output or we get "expected 1 results(s), got 0" error + let mut outputs = vec![Val::S32(0); results]; + if let Err(e) = func + .call_async(&mut self.store, ¶ms, &mut outputs) + .await + { + self.sender + .send(Event::ExecutionFailed( + function.node_id.clone(), + e.to_string(), + )) + .unwrap(); + } else if let Some(output) = outputs.first() { + val = Some(output.clone()); + self.sender + .send(Event::ExecutionSucceeded( + function.node_id.clone(), + output.clone(), + )) + .unwrap(); + } + func.post_return_async(&mut self.store).await.unwrap(); + } + } + + if let Some(val) = val { + self.graph.node_weight_mut(node_index).unwrap().set_val(val); + } + } + } + + fn prepare(&self) -> Vec<(NodeIndex, HashMap)> { + let mut nodes = Vec::new(); + for node_index in toposort(&self.graph, None).unwrap() { + let edges = self.graph.edges_directed(node_index, petgraph::Incoming); + let inputs = edges + .map(|edge| (edge.weight().clone(), edge.source())) + .collect(); + nodes.push((node_index, inputs)); + } + nodes + } +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Wasmtime error: {0}")] + Wasmtime(#[from] wasmtime::Error), +} + +#[derive(Clone, Debug)] +pub enum Event { + ExecutionFailed(NodeId, String), + ExecutionStarted(NodeId, Vec), + ExecutionSucceeded(NodeId, Val), +} diff --git a/crates/workflow/Cargo.toml b/crates/workflow/Cargo.toml new file mode 100644 index 0000000..567c99a --- /dev/null +++ b/crates/workflow/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "workflow" +version = "0.1.0" +edition = "2024" + +[dependencies] +file-source = { path = "../file-source" } +serde.workspace = true +serde_json.workspace = true +serde_yaml.workspace = true +thiserror.workspace = true + +[target.'cfg(all())'.dependencies] +clap.workspace = true +tokio.workspace = true diff --git a/crates/workflow/README.md b/crates/workflow/README.md new file mode 100644 index 0000000..b14ac90 --- /dev/null +++ b/crates/workflow/README.md @@ -0,0 +1 @@ +Serializer/deserializer for the workflow JSON/YAML format. diff --git a/crates/workflow/src/lib.rs b/crates/workflow/src/lib.rs new file mode 100644 index 0000000..70eb81c --- /dev/null +++ b/crates/workflow/src/lib.rs @@ -0,0 +1,2 @@ +mod workflow; +pub use workflow::*; diff --git a/crates/workflow/src/main.rs b/crates/workflow/src/main.rs new file mode 100644 index 0000000..212d9ba --- /dev/null +++ b/crates/workflow/src/main.rs @@ -0,0 +1,30 @@ +use clap::Parser; +use workflow::Workflow; + +#[derive(Parser)] +#[command(author, version, about, long_about = None)] +struct Cli { + source: String, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let cli = Cli::parse(); + + let content = tokio::fs::read_to_string(&cli.source).await?; + let workflow: Workflow = if cli.source.ends_with(".json") { + serde_json::from_str(&content)? + } else if cli.source.ends_with(".yaml") || cli.source.ends_with(".yml") { + serde_yaml::from_str(&content)? + } else { + return Err("File must be either JSON or YAML".into()); + }; + + println!( + "Workflow parsed successfully with {} nodes and {} edges", + workflow.nodes.len(), + workflow.edges.len() + ); + + Ok(()) +} diff --git a/crates/workflow/src/workflow.rs b/crates/workflow/src/workflow.rs new file mode 100644 index 0000000..3a3f9c8 --- /dev/null +++ b/crates/workflow/src/workflow.rs @@ -0,0 +1,50 @@ +mod edge; +mod node; +mod types; +use std::{collections::HashMap, path::PathBuf}; + +pub use edge::*; +use file_source::FileSource; +pub use node::*; +use serde::{Deserialize, Serialize}; +pub use types::*; + +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +pub struct Workflow { + pub dependencies: HashMap, + pub edges: Vec, + pub nodes: HashMap, +} + +impl Workflow { + pub fn load(path: &PathBuf) -> Result { + let source = std::fs::read_to_string(path)?; + let is_json = path.extension().is_some_and(|ext| ext == "json"); + let workflow: Workflow = match is_json { + true => serde_json::from_str(&source)?, + false => serde_yaml::from_str(&source)?, + }; + Ok(workflow) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error(transparent)] + File(#[from] std::io::Error), + #[error(transparent)] + Json(#[from] serde_json::Error), + #[error(transparent)] + Yaml(#[from] serde_yaml::Error), +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_deserialize_workflow() { + let json = include_str!("../../../examples/hello-world.json"); + let _: Workflow = serde_json::from_str(json).unwrap(); + } +} diff --git a/crates/engine/src/workflow/edge.rs b/crates/workflow/src/workflow/edge.rs similarity index 51% rename from crates/engine/src/workflow/edge.rs rename to crates/workflow/src/workflow/edge.rs index 7bbfaca..9fc3def 100644 --- a/crates/engine/src/workflow/edge.rs +++ b/crates/workflow/src/workflow/edge.rs @@ -1,14 +1,9 @@ use serde::{Deserialize, Serialize}; -use super::node::NodeId; -use crate::metadata::InputName; - -#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] -pub struct EdgeId(pub String); +use super::{InputName, NodeId}; #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Edge { - pub id: EdgeId, pub input: InputName, pub source: NodeId, pub target: NodeId, diff --git a/crates/workflow/src/workflow/node.rs b/crates/workflow/src/workflow/node.rs new file mode 100644 index 0000000..3b9214c --- /dev/null +++ b/crates/workflow/src/workflow/node.rs @@ -0,0 +1,16 @@ +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; + +use super::{ComponentName, FunctionName, InputName}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct Node { + /// The function to run + pub run: FunctionName, + /// The component to use from the dependencies + pub r#use: ComponentName, + /// Optional manual inputs to the function (wasm_wave encoded) + #[serde(default)] + pub with: HashMap, +} diff --git a/crates/workflow/src/workflow/types.rs b/crates/workflow/src/workflow/types.rs new file mode 100644 index 0000000..81036e6 --- /dev/null +++ b/crates/workflow/src/workflow/types.rs @@ -0,0 +1,47 @@ +use std::{fmt, ops::Deref}; + +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)] +pub struct ComponentName(pub String); + +impl fmt::Display for ComponentName { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "component:{}", self.0) + } +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct FunctionName(pub String); + +impl fmt::Display for FunctionName { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "func:{}", self.0) + } +} + +impl Deref for FunctionName { + type Target = str; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)] +pub struct InputName(pub String); + +impl fmt::Display for InputName { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "input:{}", self.0) + } +} + +#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)] +pub struct NodeId(pub String); + +impl fmt::Display for NodeId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "node:{}", self.0) + } +} diff --git a/examples/hello-world.json b/examples/hello-world.json new file mode 100644 index 0000000..fec613b --- /dev/null +++ b/examples/hello-world.json @@ -0,0 +1,22 @@ +{ + "dependencies": { + "hello-world": "ghcr.io/wassemble/hello-world:0.1.0" + }, + "edges": [ + { + "input": "a", + "source": "hello-world1", + "target": "hello-world2" + } + ], + "nodes": { + "hello-world1": { + "run": "hello-world", + "use": "hello-world" + }, + "hello-world2": { + "run": "hello-world", + "use": "hello-world" + } + } +} diff --git a/examples/http.json b/examples/http.json new file mode 100644 index 0000000..7a20d47 --- /dev/null +++ b/examples/http.json @@ -0,0 +1,28 @@ +{ + "edges": [ + { + "source": "node1", + "target": "node2" + } + ], + "nodes": { + "node1": { + "component": "/target/wasm32-wasip1/debug/component_http.wasm", + "function": "handle", + "interface": "wasi:http/incoming-handler@0.2.5", + "position": { + "x": 0, + "y": 0 + } + }, + "node2": { + "args": [null, "!"], + "component": "/target/wasm32-wasip1/debug/demo.wasm", + "function": "and-world", + "position": { + "x": 100, + "y": 100 + } + } + } +} diff --git a/justfile b/justfile index 134c659..ad203c7 100644 --- a/justfile +++ b/justfile @@ -5,9 +5,6 @@ check: cargo machete cargo sort-derives -engine workflow: - WASMTIME_BACKTRACE_DETAILS=1 cargo run -p engine -- run -w ./engine/examples/{{workflow}}.json - install: cargo install --locked cargo-component cargo install --locked cargo-machete @@ -18,4 +15,7 @@ install: pnpm install parse workflow: - WASMTIME_BACKTRACE_DETAILS=1 cargo run -p engine -- parse -w ./engine/examples/{{workflow}}.json + WASMTIME_BACKTRACE_DETAILS=1 cargo run -p runtime -- parse -w ./examples/{{workflow}}.json + +run workflow: + WASMTIME_BACKTRACE_DETAILS=1 cargo run -p runtime -- run -w ./examples/{{workflow}}.json