From c3d808ca3c5de4e747b0069e2d1d385b6eb33722 Mon Sep 17 00:00:00 2001 From: sampurnamo <157491025+sampurnamo@users.noreply.github.com> Date: Wed, 4 Mar 2026 06:05:38 -0500 Subject: [PATCH 01/12] docs: add multi-agent coordination guide Covers verified patterns from live multi-session testing: - channel semantics: direct/group (recipient:null) vs pub/sub fan-out - cross-session delivery: store-level, confirmed with live message IDs - three-layer awareness model: store delivery, human-terminal watcher, agent-session explicit polling - poll-before-respond discipline for preventing parallel-reply blindspots - watcher channel scope and dynamic channel resolution - summary table of patterns and routing behavior All commands and behaviors verified against live triveni.acomm store. --- docs/public/multi-agent-coordination.md | 163 ++++++++++++++++++++++++ 1 file changed, 163 insertions(+) create mode 100644 docs/public/multi-agent-coordination.md diff --git a/docs/public/multi-agent-coordination.md b/docs/public/multi-agent-coordination.md new file mode 100644 index 0000000..9c23ed2 --- /dev/null +++ b/docs/public/multi-agent-coordination.md @@ -0,0 +1,163 @@ +--- +status: stable +--- + +# Multi-Agent Coordination with AgenticComm + +Practical patterns for coordinating multiple AI agents through a shared AgenticComm store. All commands and behaviors here were verified in a live multi-session setup. + +--- + +## Channel Semantics + +AgenticComm supports two distinct communication patterns. Choose the right one for your coordination need. + +### Direct / Group channels — shared stream + +`acomm message send` writes one message to the channel stream. All participants read from the same stream; no per-recipient routing occurs. + +```bash +# Send a message to a group channel +acomm message send 1 "Build complete — ready for review" --sender ci-agent --file agent.acomm --json +# → { "channel_id": 1, "message_id": 42, "status": "sent" } + +# Any participant can read it +acomm message list 1 --file agent.acomm --json +# → [ { "id": 42, "sender": "ci-agent", "recipient": null, "content": "Build complete..." } ] +``` + +`recipient` is `null` — the message belongs to the channel, not to a specific reader. + +### Pub/sub — fan-out with per-subscriber delivery records + +`subscribe` + `publish` creates one delivery record per registered subscriber. Each subscriber reads only their own entry. + +```bash +# Register subscribers +acomm subscribe updates meera --file agent.acomm +acomm subscribe updates ishika --file agent.acomm + +# Publish — fans out to all subscribers +acomm publish updates "sprint-started" --sender orchestrator --file agent.acomm --json +# → { "topic": "updates", "delivered_count": 2, "status": "published" } + +# Each subscriber reads only their own delivery +acomm receive 1 --recipient meera --file agent.acomm --json +# → [ { "recipient": "meera", "content": "sprint-started", ... } ] + +acomm receive 1 --recipient ishika --file agent.acomm --json +# → [ { "recipient": "ishika", "content": "sprint-started", ... } ] +``` + +`delivered_count: 2` confirms both subscribers received the message. Each agent's `receive --recipient` call returns only their own entry — no cross-delivery. + +--- + +## Cross-Session Delivery + +Messages written by one agent session are immediately readable by another session on the same store. This was confirmed in live testing: + +| Message | Sender | Session | Lamport | Timestamp | +|---------|--------|---------|---------|-----------| +| `real-ishika-probe:...` | ishika | Session A | 40 | 10:28:00 | +| `real-meera-ack:...` | meera | Session B | 41 | 10:30:15 | + +Sessions A and B are independent Claude Code processes with a ~2-minute gap between sends. Both messages persist in the shared store and are readable by either session. + +**What "cross-session delivery" means here:** the `.acomm` store is a local file. Any session with a path to that file can read and write it. There is no network hop; delivery latency is local I/O. + +--- + +## Awareness Model + +Cross-session delivery (store level) works, but agents becoming *aware* of new messages requires an additional step. There are three distinct layers: + +### Layer 1 — Store delivery ✓ + +`acomm message send` → message persisted in `.acomm`. Verifiable with `message list`. Works automatically. + +### Layer 2 — Human-terminal awareness (optional watcher) + +`acomm-notify.ps1` (or equivalent) is a polling loop that watches configured channels and prints new messages to its terminal window via `Write-Host`. The human watching that window sees near-real-time alerts. + +**Important:** the watcher must be: +1. Explicitly started (it does not auto-launch) +2. Configured to watch the channels where messages are expected + +```powershell +# Start watcher for specific channels +.\acomm-notify.ps1 -Channels 'trine-handoff','trine-gate' -IntervalSeconds 2 +``` + +### Layer 3 — Agent-session awareness (explicit polling) + +A running agent session does not receive `Write-Host` output from a separate watcher process. The agent's conversation thread is unaware of new messages unless it explicitly polls. + +**Observed behavior (live test, 2026-03-04):** When session A sent a message, session B's terminal showed no alert — session B remained oblivious until it explicitly called `message list`. + +The reliable agent-side pattern is disciplined polling at the start of each interaction turn: + +```bash +# At the start of each turn, check for new messages since last read +acomm message list 1 --file agent.acomm --json +``` + +**Future path:** MCP resource subscriptions (`resources/subscribe` + `notifications/resources/updated`) would allow the MCP server to push notifications to an active agent session, eliminating the need for explicit polling. This is not yet implemented in agentchattr or Claude Code. + +--- + +## Poll-Before-Respond Discipline + +When multiple agents share a channel, they poll independently and cannot see each other's in-flight responses before their own turn completes. This creates a parallel-reply blindspot. + +**The problem:** Agent A and Agent B both see a question at lamport T. Both compose answers independently. Both post. The human receives two parallel responses that didn't account for each other. + +**The discipline:** Before posting a response in a shared channel, poll for new messages and check whether a peer has already responded since your last read. + +```bash +# Before sending, check the channel's current state +LAST=$(acomm message list 1 --file agent.acomm --json | python3 -c \ + "import sys,json; msgs=json.load(sys.stdin); print(msgs[-1]['id'] if msgs else 0)") + +# Only post if no peer has answered since your last check +if [ "$LAST" -le "$YOUR_LAST_KNOWN_ID" ]; then + acomm message send 1 "My response..." --sender agent-a --file agent.acomm +fi +``` + +The `delivered_count` from `publish` provides a similar gate for pub/sub workflows — if `delivered_count` is 0, no subscribers are registered and the message would be undelivered. + +--- + +## Watcher Channel Scope + +The watcher only surfaces messages from channels in its watch list. If a channel is created dynamically (outside the initial setup), it must be explicitly added to the watcher's `-Channels` argument. + +```powershell +# Watcher that covers both standard and dynamic channels +.\acomm-notify.ps1 -Channels 'trine-handoff','trine-gate','trine-debate','my-new-channel' +``` + +Channel name resolution depends on the channel being registered in the store. If a channel was created after the watcher started, restart the watcher to pick up the updated channel map. + +--- + +## Runnable Example + +See `examples/pubsub-fanout-recipient-delivery.sh` for a self-contained, verifiable demonstration of the pub/sub fan-out and recipient-scoped delivery pattern: + +```bash +bash examples/pubsub-fanout-recipient-delivery.sh +# PASS: publish fan-out delivered to two subscribers with recipient-scoped receive output. +``` + +--- + +## Summary + +| Pattern | Use when | Recipient routing | +|---------|----------|-------------------| +| `message send` / `list` | All participants need the same message | `recipient: null` (channel stream) | +| `subscribe` + `publish` + `receive --recipient` | Each agent needs their own delivery record | `recipient: some(name)` per subscriber | +| Poll-before-respond | Preventing parallel duplicate replies | — discipline, not a command | +| Watcher loop | Human-terminal alerting | — optional, not agent-native | From 2c9a62f19d673d36d998460e7aa0dce14de7e1ae Mon Sep 17 00:00:00 2001 From: sampurnamo <157491025+sampurnamo@users.noreply.github.com> Date: Wed, 4 Mar 2026 05:44:32 -0500 Subject: [PATCH 02/12] docs(cli): clarify send/receive stream vs recipient-scoped delivery --- docs/public/cli-reference.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/public/cli-reference.md b/docs/public/cli-reference.md index 18e7507..65ecc76 100644 --- a/docs/public/cli-reference.md +++ b/docs/public/cli-reference.md @@ -98,6 +98,11 @@ Options: --file Path to .acomm store file ``` +**Behavior note:** +- `acomm send` writes to the channel stream. +- For direct/group channels, messages are channel-addressed (`recipient: null`) and visible to channel participants. +- `--recipient` is most useful for per-recipient records (for example pub/sub `publish` deliveries and broadcast copies). + **Example: Receive all messages from channel 1** ```bash From 7cc17aae7aa293ce4b6014b23ceeabd5253d9397 Mon Sep 17 00:00:00 2001 From: sampurnamo <157491025+sampurnamo@users.noreply.github.com> Date: Wed, 4 Mar 2026 05:27:09 -0500 Subject: [PATCH 03/12] docs(examples): add verified pubsub fan-out delivery walkthrough --- docs/public/cli-reference.md | 5 ++ examples/README.md | 28 +++---- examples/pubsub-fanout-recipient-delivery.sh | 78 ++++++++++++++++++++ 3 files changed, 95 insertions(+), 16 deletions(-) create mode 100644 examples/pubsub-fanout-recipient-delivery.sh diff --git a/docs/public/cli-reference.md b/docs/public/cli-reference.md index 65ecc76..fc43849 100644 --- a/docs/public/cli-reference.md +++ b/docs/public/cli-reference.md @@ -341,6 +341,11 @@ Output: The `delivered_count` indicates how many subscribers received the message. If no subscribers are registered for the topic, the count is 0 and no messages are created. +**Coordination tip (multi-agent teams):** +- Use `subscribe` + `publish` for routing/fan-out. +- Before sending a follow-up response in collaborative channels, run a poll/read step first so you do not post a duplicate parallel reply. +- See runnable example: `examples/pubsub-fanout-recipient-delivery.sh`. + --- ### acomm history diff --git a/examples/README.md b/examples/README.md index 04fefa5..049afa2 100644 --- a/examples/README.md +++ b/examples/README.md @@ -1,35 +1,31 @@ # AgenticComm Examples -Runnable examples demonstrating the AgenticComm Python SDK. +Runnable, no-cloud examples for common coordination patterns. ## Prerequisites ```bash -pip install agentic-comm -cargo install agentic-comm +acomm --version ``` +The examples below use only the `acomm` CLI and a temporary local `.acomm` store. + ## Examples | File | Description | |------|-------------| -| `01_basic_messaging.py` | Simplest possible example. Create a store, create a channel, send and receive messages. | -| `02_multi_agent.py` | Multi-agent coordination. Multiple agents sending tasks and status updates through shared channels. | -| `03_task_queue.py` | Task queue pattern. One agent posts work items, another agent picks them up and reports completion. | -| `04_code_review.py` | Code review handoff. Developer agent requests review, reviewer agent provides feedback through channels. | -| `05_broadcast.py` | Broadcasting pattern. System-wide announcements sent to all channels simultaneously. | -| `06_search_history.py` | Search and history. Finding past messages and reviewing channel history for context. | +| `pubsub-fanout-recipient-delivery.sh` | Verifies `subscribe + publish` fan-out and recipient-scoped delivery (`receive --recipient`). | ## Running ```bash -# All examples — no API key needed -python examples/01_basic_messaging.py -python examples/02_multi_agent.py -python examples/03_task_queue.py -python examples/04_code_review.py -python examples/05_broadcast.py -python examples/06_search_history.py +bash examples/pubsub-fanout-recipient-delivery.sh +``` + +Optional: pass a store path to keep artifacts for inspection. + +```bash +bash examples/pubsub-fanout-recipient-delivery.sh ./scratch/pubsub-demo.acomm ``` ## MCP Server diff --git a/examples/pubsub-fanout-recipient-delivery.sh b/examples/pubsub-fanout-recipient-delivery.sh new file mode 100644 index 0000000..e449736 --- /dev/null +++ b/examples/pubsub-fanout-recipient-delivery.sh @@ -0,0 +1,78 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Demonstrates deterministic pub/sub coordination: +# 1) two subscribers register to one topic +# 2) one publish fans out to both +# 3) each recipient reads only their own delivery entry + +if ! command -v acomm >/dev/null 2>&1; then + echo "error: acomm CLI is required but was not found in PATH" >&2 + exit 1 +fi + +STORE_INPUT="${1:-}" +TOPIC="${TOPIC:-updates}" +SUBSCRIBER_A="${SUBSCRIBER_A:-meera}" +SUBSCRIBER_B="${SUBSCRIBER_B:-ishika}" +SENDER="${SENDER:-ci-agent}" +CONTENT="${CONTENT:-hello-topic}" +CHANNEL_ID=1 + +cleanup_store=0 +if [[ -n "$STORE_INPUT" ]]; then + STORE="$STORE_INPUT" +else + STORE="$(mktemp -t agentic-comm-pubsub-XXXXXX.acomm)" + cleanup_store=1 +fi + +cleanup() { + if [[ "$cleanup_store" -eq 1 ]]; then + rm -f "$STORE" + fi +} +trap cleanup EXIT + +rm -f "$STORE" +acomm init --json "$STORE" >/dev/null + +acomm subscribe --file "$STORE" --json "$TOPIC" "$SUBSCRIBER_A" >/dev/null +acomm subscribe --file "$STORE" --json "$TOPIC" "$SUBSCRIBER_B" >/dev/null + +publish_json="$(acomm publish --file "$STORE" --json --sender "$SENDER" "$TOPIC" "$CONTENT")" +if ! grep -q '"delivered_count":[[:space:]]*2' <<<"$publish_json"; then + echo "error: expected delivered_count=2, got:" >&2 + echo "$publish_json" >&2 + exit 1 +fi + +recv_a="$(acomm receive --file "$STORE" --json --recipient "$SUBSCRIBER_A" "$CHANNEL_ID")" +recv_b="$(acomm receive --file "$STORE" --json --recipient "$SUBSCRIBER_B" "$CHANNEL_ID")" + +if ! grep -q "\"recipient\":[[:space:]]*\"$SUBSCRIBER_A\"" <<<"$recv_a"; then + echo "error: expected recipient $SUBSCRIBER_A in receive output" >&2 + echo "$recv_a" >&2 + exit 1 +fi + +if ! grep -q "\"recipient\":[[:space:]]*\"$SUBSCRIBER_B\"" <<<"$recv_b"; then + echo "error: expected recipient $SUBSCRIBER_B in receive output" >&2 + echo "$recv_b" >&2 + exit 1 +fi + +if grep -q "\"recipient\":[[:space:]]*\"$SUBSCRIBER_B\"" <<<"$recv_a"; then + echo "error: cross-delivery detected in $SUBSCRIBER_A receive output" >&2 + echo "$recv_a" >&2 + exit 1 +fi + +if grep -q "\"recipient\":[[:space:]]*\"$SUBSCRIBER_A\"" <<<"$recv_b"; then + echo "error: cross-delivery detected in $SUBSCRIBER_B receive output" >&2 + echo "$recv_b" >&2 + exit 1 +fi + +echo "PASS: publish fan-out delivered to two subscribers with recipient-scoped receive output." +echo "store: $STORE" From 1689e8a8bed3476228ead97300bcd49966901431 Mon Sep 17 00:00:00 2001 From: sampurnamo <157491025+sampurnamo@users.noreply.github.com> Date: Wed, 4 Mar 2026 16:03:10 -0500 Subject: [PATCH 04/12] docs(coordination): add watcher lifecycle wrapper usage --- docs/public/multi-agent-coordination.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/docs/public/multi-agent-coordination.md b/docs/public/multi-agent-coordination.md index 9c23ed2..b9b36be 100644 --- a/docs/public/multi-agent-coordination.md +++ b/docs/public/multi-agent-coordination.md @@ -89,6 +89,23 @@ Cross-session delivery (store level) works, but agents becoming *aware* of new m .\acomm-notify.ps1 -Channels 'trine-handoff','trine-gate' -IntervalSeconds 2 ``` +#### Watcher lifecycle wrappers (always-on operation) + +In Triveni-style operational setups, use lifecycle wrappers so the watcher can run continuously instead of being manually re-launched: + +```powershell +# Start background notifier process (writes pid metadata) +.\start-acomm-notifier.ps1 -Channels 'trine-handoff','trine-gate','trine-debate' -IntervalSeconds 2 + +# Check running/stale status +.\status-acomm-notifier.ps1 -Json + +# Stop notifier process and clear pid metadata +.\stop-acomm-notifier.ps1 +``` + +These wrappers maintain a daemon metadata file (pid + channels + interval) and can redirect watcher output/error to logs for operator inspection. + ### Layer 3 — Agent-session awareness (explicit polling) A running agent session does not receive `Write-Host` output from a separate watcher process. The agent's conversation thread is unaware of new messages unless it explicitly polls. From 27448c05a28c02a00ffe308a2667797dbb962c7e Mon Sep 17 00:00:00 2001 From: sampurnamo <157491025+sampurnamo@users.noreply.github.com> Date: Wed, 4 Mar 2026 16:34:01 -0500 Subject: [PATCH 05/12] =?UTF-8?q?docs(coordination):=20correct=20Layer=203?= =?UTF-8?q?=20future=20path=20=E2=80=94=20achievable=20today=20via=20plugi?= =?UTF-8?q?n?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previous text stated MCP resources/subscribe was the only path to auto-injection. Corrected: the UserPromptSubmit hook pattern (Mira/MemSearch) achieves the same outcome today without any MCP protocol changes. A proper acomm plugin can be built using this pattern. MCP resources/subscribe remains aspirational for true mid-turn push. --- docs/public/multi-agent-coordination.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/public/multi-agent-coordination.md b/docs/public/multi-agent-coordination.md index b9b36be..38f4b75 100644 --- a/docs/public/multi-agent-coordination.md +++ b/docs/public/multi-agent-coordination.md @@ -119,7 +119,7 @@ The reliable agent-side pattern is disciplined polling at the start of each inte acomm message list 1 --file agent.acomm --json ``` -**Future path:** MCP resource subscriptions (`resources/subscribe` + `notifications/resources/updated`) would allow the MCP server to push notifications to an active agent session, eliminating the need for explicit polling. This is not yet implemented in agentchattr or Claude Code. +**Near-term path (achievable today):** Build an acomm Claude Code plugin using the same `UserPromptSubmit` hook injection pattern that tools like Mira and MemSearch already use. The plugin polls the acomm store at every turn boundary and injects new messages as system context automatically — eliminating the need for explicit agent polling. This requires building the plugin, not waiting for any MCP protocol changes. MCP `resources/subscribe` + `notifications/resources/updated` would enable true mid-turn push (no turn-boundary dependency) but that notification path is not yet implemented in Claude Code's MCP client. --- From 7e4ffaafd2eddab164c4fa80bf52d44b0939b826 Mon Sep 17 00:00:00 2001 From: sampurnamo <157491025+sampurnamo@users.noreply.github.com> Date: Wed, 4 Mar 2026 16:51:07 -0500 Subject: [PATCH 06/12] =?UTF-8?q?docs(coordination):=20simplify=20Layer=20?= =?UTF-8?q?3=20path=20=E2=80=94=20hook=20only,=20not=20plugin?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A UserPromptSubmit hook script outputting acomm deltas is sufficient. No MCP plugin architecture needed. Peer messages become ambient context, same mechanism as task list / code snippet injection already in use. --- docs/public/multi-agent-coordination.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/public/multi-agent-coordination.md b/docs/public/multi-agent-coordination.md index 38f4b75..c1b1862 100644 --- a/docs/public/multi-agent-coordination.md +++ b/docs/public/multi-agent-coordination.md @@ -119,7 +119,7 @@ The reliable agent-side pattern is disciplined polling at the start of each inte acomm message list 1 --file agent.acomm --json ``` -**Near-term path (achievable today):** Build an acomm Claude Code plugin using the same `UserPromptSubmit` hook injection pattern that tools like Mira and MemSearch already use. The plugin polls the acomm store at every turn boundary and injects new messages as system context automatically — eliminating the need for explicit agent polling. This requires building the plugin, not waiting for any MCP protocol changes. MCP `resources/subscribe` + `notifications/resources/updated` would enable true mid-turn push (no turn-boundary dependency) but that notification path is not yet implemented in Claude Code's MCP client. +**Near-term path (achievable today):** Wire a `UserPromptSubmit` hook that queries the acomm store for channel deltas and writes them to stdout. Claude Code injects the output as system context before the agent's turn begins — no explicit agent polling required. The agent becomes aware of peer messages as ambient context, the same way other hooks inject task lists, code snippets, and session state transparently. This requires only a hook script, not any MCP protocol changes. MCP `resources/subscribe` + `notifications/resources/updated` would enable true mid-turn push (no turn-boundary dependency) but that notification path is not yet implemented in Claude Code's MCP client. --- From 4340c688dc5b7a5173d27f4291ddab70ea416fa0 Mon Sep 17 00:00:00 2001 From: sampurnamo <157491025+sampurnamo@users.noreply.github.com> Date: Wed, 4 Mar 2026 19:24:16 -0500 Subject: [PATCH 07/12] docs(coordination): remove Triveni-specific watcher references --- docs/public/multi-agent-coordination.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/public/multi-agent-coordination.md b/docs/public/multi-agent-coordination.md index c1b1862..8fe0089 100644 --- a/docs/public/multi-agent-coordination.md +++ b/docs/public/multi-agent-coordination.md @@ -86,22 +86,22 @@ Cross-session delivery (store level) works, but agents becoming *aware* of new m ```powershell # Start watcher for specific channels -.\acomm-notify.ps1 -Channels 'trine-handoff','trine-gate' -IntervalSeconds 2 +.\acomm-notify.ps1 -Channels 'handoff','gate' -IntervalSeconds 2 ``` #### Watcher lifecycle wrappers (always-on operation) -In Triveni-style operational setups, use lifecycle wrappers so the watcher can run continuously instead of being manually re-launched: +In operational setups, use lifecycle wrappers so the watcher can run continuously instead of being manually re-launched: ```powershell # Start background notifier process (writes pid metadata) -.\start-acomm-notifier.ps1 -Channels 'trine-handoff','trine-gate','trine-debate' -IntervalSeconds 2 +.\start-notifier.ps1 -Channels 'handoff','gate','debate' -IntervalSeconds 2 # Check running/stale status -.\status-acomm-notifier.ps1 -Json +.\status-notifier.ps1 -Json # Stop notifier process and clear pid metadata -.\stop-acomm-notifier.ps1 +.\stop-notifier.ps1 ``` These wrappers maintain a daemon metadata file (pid + channels + interval) and can redirect watcher output/error to logs for operator inspection. @@ -152,7 +152,7 @@ The watcher only surfaces messages from channels in its watch list. If a channel ```powershell # Watcher that covers both standard and dynamic channels -.\acomm-notify.ps1 -Channels 'trine-handoff','trine-gate','trine-debate','my-new-channel' +.\acomm-notify.ps1 -Channels 'handoff','gate','debate','my-new-channel' ``` Channel name resolution depends on the channel being registered in the store. If a channel was created after the watcher started, restart the watcher to pick up the updated channel map. From 4aaec37f2d6560362cdd976105fe91cc1fda4ab0 Mon Sep 17 00:00:00 2001 From: sampurnamo <157491025+sampurnamo@users.noreply.github.com> Date: Wed, 4 Mar 2026 19:34:59 -0500 Subject: [PATCH 08/12] docs(coordination): remove agent-specific names and add hook pattern to summary MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace meera/ishika subscriber/recipient examples with generic agent-a/agent-b - Replace real probe message content in cross-session table with generic placeholders - Add UserPromptSubmit hook row to summary table — the ambient context injection pattern --- docs/public/multi-agent-coordination.md | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/docs/public/multi-agent-coordination.md b/docs/public/multi-agent-coordination.md index 8fe0089..6f0b03f 100644 --- a/docs/public/multi-agent-coordination.md +++ b/docs/public/multi-agent-coordination.md @@ -34,19 +34,19 @@ acomm message list 1 --file agent.acomm --json ```bash # Register subscribers -acomm subscribe updates meera --file agent.acomm -acomm subscribe updates ishika --file agent.acomm +acomm subscribe updates agent-a --file agent.acomm +acomm subscribe updates agent-b --file agent.acomm # Publish — fans out to all subscribers acomm publish updates "sprint-started" --sender orchestrator --file agent.acomm --json # → { "topic": "updates", "delivered_count": 2, "status": "published" } # Each subscriber reads only their own delivery -acomm receive 1 --recipient meera --file agent.acomm --json -# → [ { "recipient": "meera", "content": "sprint-started", ... } ] +acomm receive 1 --recipient agent-a --file agent.acomm --json +# → [ { "recipient": "agent-a", "content": "sprint-started", ... } ] -acomm receive 1 --recipient ishika --file agent.acomm --json -# → [ { "recipient": "ishika", "content": "sprint-started", ... } ] +acomm receive 1 --recipient agent-b --file agent.acomm --json +# → [ { "recipient": "agent-b", "content": "sprint-started", ... } ] ``` `delivered_count: 2` confirms both subscribers received the message. Each agent's `receive --recipient` call returns only their own entry — no cross-delivery. @@ -59,8 +59,8 @@ Messages written by one agent session are immediately readable by another sessio | Message | Sender | Session | Lamport | Timestamp | |---------|--------|---------|---------|-----------| -| `real-ishika-probe:...` | ishika | Session A | 40 | 10:28:00 | -| `real-meera-ack:...` | meera | Session B | 41 | 10:30:15 | +| `probe:session-a:...` | agent-a | Session A | 40 | 10:28:00 | +| `ack:session-b:...` | agent-b | Session B | 41 | 10:30:15 | Sessions A and B are independent Claude Code processes with a ~2-minute gap between sends. Both messages persist in the shared store and are readable by either session. @@ -178,3 +178,4 @@ bash examples/pubsub-fanout-recipient-delivery.sh | `subscribe` + `publish` + `receive --recipient` | Each agent needs their own delivery record | `recipient: some(name)` per subscriber | | Poll-before-respond | Preventing parallel duplicate replies | — discipline, not a command | | Watcher loop | Human-terminal alerting | — optional, not agent-native | +| `UserPromptSubmit` hook | Auto-injecting acomm deltas as ambient context at every agent turn start | — no manual poll required | From d0eba4266a7fa37b3042cc977d29e5ac03fdd087 Mon Sep 17 00:00:00 2001 From: sampurnamo <157491025+sampurnamo@users.noreply.github.com> Date: Wed, 4 Mar 2026 19:41:56 -0500 Subject: [PATCH 09/12] docs(example): use generic subscriber defaults --- examples/pubsub-fanout-recipient-delivery.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/pubsub-fanout-recipient-delivery.sh b/examples/pubsub-fanout-recipient-delivery.sh index e449736..d99c2a1 100644 --- a/examples/pubsub-fanout-recipient-delivery.sh +++ b/examples/pubsub-fanout-recipient-delivery.sh @@ -13,8 +13,8 @@ fi STORE_INPUT="${1:-}" TOPIC="${TOPIC:-updates}" -SUBSCRIBER_A="${SUBSCRIBER_A:-meera}" -SUBSCRIBER_B="${SUBSCRIBER_B:-ishika}" +SUBSCRIBER_A="${SUBSCRIBER_A:-agent-a}" +SUBSCRIBER_B="${SUBSCRIBER_B:-agent-b}" SENDER="${SENDER:-ci-agent}" CONTENT="${CONTENT:-hello-topic}" CHANNEL_ID=1 From 3abcf6442283cb6f2079ee0f75d67fcefdbe668d Mon Sep 17 00:00:00 2001 From: sampurnamo <157491025+sampurnamo@users.noreply.github.com> Date: Wed, 4 Mar 2026 01:39:31 -0500 Subject: [PATCH 10/12] feat(acomm): add one-shot chat poll/send subcommands --- crates/agentic-comm-cli/src/main.rs | 88 +++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/crates/agentic-comm-cli/src/main.rs b/crates/agentic-comm-cli/src/main.rs index 059260d..a2a46f9 100644 --- a/crates/agentic-comm-cli/src/main.rs +++ b/crates/agentic-comm-cli/src/main.rs @@ -99,6 +99,11 @@ enum Commands { #[command(subcommand)] action: RecvAction, }, + /// Chat subcommands (poll, send) + Chat { + #[command(subcommand)] + action: ChatAction, + }, /// Query subcommands (messages, channels, relationships, echoes, conversations) Query { #[command(subcommand)] @@ -390,6 +395,38 @@ enum RecvAction { }, } +// --------------------------------------------------------------------------- +// Chat subcommands +// --------------------------------------------------------------------------- + +#[derive(Subcommand)] +enum ChatAction { + /// Poll messages from a channel in one shot + Poll { + /// Channel ID + #[arg(long)] + channel: u64, + /// Optional Unix timestamp (seconds) lower bound + #[arg(long)] + since: Option, + /// Maximum messages to return + #[arg(long, default_value = "50")] + limit: usize, + }, + /// Send one message to a channel + Send { + /// Channel ID + #[arg(long)] + channel: u64, + /// Message payload/content + #[arg(long)] + message: String, + /// Sender identifier + #[arg(long)] + sender: String, + }, +} + // --------------------------------------------------------------------------- // Query subcommands // --------------------------------------------------------------------------- @@ -1441,6 +1478,57 @@ fn main() { } }, + // ----------------------------------------------------------------- + // Chat subcommands (one-shot poll/send) + // ----------------------------------------------------------------- + Commands::Chat { action } => match action { + ChatAction::Poll { + channel, + since, + limit, + } => { + let mut store = load_or_create(&store_path); + let since_dt = since.and_then(|ts| chrono::DateTime::::from_timestamp(ts, 0)); + match store.receive_messages(channel, None, since_dt) { + Ok(mut msgs) => { + msgs.truncate(limit); + output(&serde_json::to_value(&msgs).unwrap(), json_mode); + } + Err(e) => { + eprintln!("Error: {e}"); + std::process::exit(1); + } + } + } + ChatAction::Send { + channel, + message, + sender, + } => { + let mut store = load_or_create(&store_path); + match store.send_message(channel, &sender, &message, MessageType::Text) { + Ok(msg) => { + output( + &serde_json::json!({ + "status": "sent", + "message_id": msg.id, + "channel_id": msg.channel_id, + "timestamp": msg.timestamp.to_rfc3339(), + }), + json_mode, + ); + if let Err(e) = store.save(&store_path) { + eprintln!("Warning: failed to save store: {e}"); + } + } + Err(e) => { + eprintln!("Error: {e}"); + std::process::exit(1); + } + } + } + }, + // ----------------------------------------------------------------- // Query subcommands // ----------------------------------------------------------------- From e53c56b0f6286a0b15ab7d2774818ad9899532df Mon Sep 17 00:00:00 2001 From: sampurnamo <157491025+sampurnamo@users.noreply.github.com> Date: Fri, 6 Mar 2026 03:38:08 -0500 Subject: [PATCH 11/12] agentic-comm(cli): implement daemon tcp relay --- Cargo.lock | 1 + crates/agentic-comm-cli/Cargo.toml | 1 + crates/agentic-comm-cli/src/main.rs | 242 +++++++++++++++++++++++++--- 3 files changed, 219 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0888e70..5e0a4a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -78,6 +78,7 @@ dependencies = [ "chrono", "clap", "serde_json", + "tokio", ] [[package]] diff --git a/crates/agentic-comm-cli/Cargo.toml b/crates/agentic-comm-cli/Cargo.toml index 20772af..2e19082 100644 --- a/crates/agentic-comm-cli/Cargo.toml +++ b/crates/agentic-comm-cli/Cargo.toml @@ -16,3 +16,4 @@ agentic-comm = { workspace = true } clap = { workspace = true } serde_json = { workspace = true } chrono = { workspace = true } +tokio = { workspace = true } diff --git a/crates/agentic-comm-cli/src/main.rs b/crates/agentic-comm-cli/src/main.rs index a2a46f9..8707c57 100644 --- a/crates/agentic-comm-cli/src/main.rs +++ b/crates/agentic-comm-cli/src/main.rs @@ -1,6 +1,9 @@ //! CLI for agentic-comm: agent communication, channels, pub/sub. -use std::path::PathBuf; +use std::{ + path::{Path, PathBuf}, + time::{Duration, SystemTime}, +}; use agentic_comm::{ AuditEntry, AuditEventType, ChannelConfig, ChannelType, CollectiveDecisionMode, CommStore, @@ -8,6 +11,12 @@ use agentic_comm::{ HiveRole, MessageFilter, MessageType, TemporalTarget, WorkspaceRole, }; use clap::{Parser, Subcommand}; +use tokio::{ + io::AsyncWriteExt, + net::TcpListener, + sync::broadcast, + time::{self, MissedTickBehavior}, +}; /// Default store file path. fn default_store_path() -> PathBuf { @@ -1155,6 +1164,105 @@ fn output(value: &serde_json::Value, json_mode: bool) { } } +struct PidFileGuard { + path: PathBuf, +} + +impl PidFileGuard { + fn new(path: PathBuf) -> Self { + Self { path } + } +} + +impl Drop for PidFileGuard { + fn drop(&mut self) { + let _ = std::fs::remove_file(&self.path); + } +} + +fn store_mtime(path: &Path) -> Option { + std::fs::metadata(path).ok()?.modified().ok() +} + +fn current_max_message_id(path: &Path) -> u64 { + if !path.exists() { + return 0; + } + + CommStore::load(path) + .map(|store| store.messages.keys().copied().max().unwrap_or(0)) + .unwrap_or(0) +} + +fn collect_new_daemon_messages(path: &Path, last_seen_message_id: &mut u64) -> Vec { + if !path.exists() { + return Vec::new(); + } + + let store = match CommStore::load(path) { + Ok(store) => store, + Err(e) => { + eprintln!("Warning: daemon could not load store {}: {e}", path.display()); + return Vec::new(); + } + }; + + let mut messages: Vec<_> = store.messages.values().cloned().collect(); + messages.sort_by_key(|msg| msg.id); + + let mut payloads = Vec::new(); + for msg in messages { + if msg.id <= *last_seen_message_id { + continue; + } + + let payload = serde_json::json!({ + "sender": msg.sender, + "text": msg.content, + "timestamp": msg.timestamp.to_rfc3339(), + "channel": store + .get_channel(msg.channel_id) + .map(|channel| channel.name) + .unwrap_or_else(|| msg.channel_id.to_string()), + "lamport": msg.comm_timestamp.lamport, + }); + + match serde_json::to_string(&payload) { + Ok(json) => payloads.push(json), + Err(e) => eprintln!("Warning: daemon could not serialize message {}: {e}", msg.id), + } + + *last_seen_message_id = msg.id; + } + + payloads +} + +fn pid_is_alive(pid: u32) -> bool { + #[cfg(windows)] + { + std::process::Command::new("tasklist") + .args(["/FI", &format!("PID eq {pid}"), "/FO", "CSV", "/NH"]) + .output() + .map(|output| { + let stdout = String::from_utf8_lossy(&output.stdout); + stdout.contains(&format!("\"{pid}\"")) + }) + .unwrap_or(false) + } + + #[cfg(not(windows))] + { + std::process::Command::new("kill") + .args(["-0", &pid.to_string()]) + .stdout(std::process::Stdio::null()) + .stderr(std::process::Stdio::null()) + .status() + .map(|status| status.success()) + .unwrap_or(false) + } +} + fn main() { let cli = Cli::parse(); let store_path = resolve_store_path(cli.file); @@ -2738,35 +2846,130 @@ fn main() { .parent() .unwrap_or_else(|| std::path::Path::new(".")); let pid_path = data_dir.join("acomm.pid"); + let stop_path = data_dir.join("acomm.stop"); let pid = std::process::id(); // Write PID file if let Err(e) = std::fs::write(&pid_path, pid.to_string()) { eprintln!("Warning: could not write PID file: {e}"); } + let _pid_guard = PidFileGuard::new(pid_path.clone()); + let _ = std::fs::remove_file(&stop_path); - output( - &serde_json::json!({ - "status": "started", - "pid": pid, - "port": port, - "data": data_path.display().to_string(), - "pid_file": pid_path.display().to_string(), - "note": "Daemon stub — exiting immediately (real daemon would loop)", - }), - json_mode, - ); + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap_or_else(|e| { + eprintln!("Error: could not create tokio runtime: {e}"); + std::process::exit(1); + }); + + let daemon_result = runtime.block_on(async { + let listener = TcpListener::bind(format!("0.0.0.0:{port}")).await?; + let (sender, _) = broadcast::channel::(256); + let mut ticker = time::interval(Duration::from_secs(1)); + ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); + let mut shutdown = Box::pin(tokio::signal::ctrl_c()); + let mut last_seen_message_id = current_max_message_id(&data_path); + let mut last_mtime = store_mtime(&data_path); + + output( + &serde_json::json!({ + "status": "listening", + "pid": pid, + "port": port, + "data": data_path.display().to_string(), + "pid_file": pid_path.display().to_string(), + }), + json_mode, + ); + + loop { + tokio::select! { + _ = &mut shutdown => { + break; + } + _ = ticker.tick() => { + if stop_path.exists() { + let _ = std::fs::remove_file(&stop_path); + break; + } + + let current_mtime = store_mtime(&data_path); + if current_mtime != last_mtime { + last_mtime = current_mtime; + for message in collect_new_daemon_messages(&data_path, &mut last_seen_message_id) { + let _ = sender.send(message); + } + } + } + accept_result = listener.accept() => { + let (mut stream, _) = accept_result?; + let mut receiver = sender.subscribe(); + tokio::spawn(async move { + while let Ok(message) = receiver.recv().await { + if stream.write_all(message.as_bytes()).await.is_err() { + break; + } + if stream.write_all(b"\n").await.is_err() { + break; + } + } + }); + } + } + } + + Ok::<(), Box>(()) + }); + + if let Err(e) = daemon_result { + eprintln!("Error: daemon failed: {e}"); + std::process::exit(1); + } } DaemonAction::Stop => { let data_dir = store_path .parent() .unwrap_or_else(|| std::path::Path::new(".")); let pid_path = data_dir.join("acomm.pid"); + let stop_path = data_dir.join("acomm.stop"); if pid_path.exists() { match std::fs::read_to_string(&pid_path) { Ok(pid_str) => { let pid_str = pid_str.trim(); + let pid = pid_str.parse::().ok(); + if let Err(e) = std::fs::write(&stop_path, "stop\n") { + eprintln!("Warning: could not write stop file: {e}"); + } + + for _ in 0..10 { + if !pid_path.exists() { + output( + &serde_json::json!({ + "status": "stopped", + "pid": pid_str, + }), + json_mode, + ); + return; + } + std::thread::sleep(Duration::from_millis(500)); + } + + if let Some(pid) = pid { + #[cfg(windows)] + let _ = std::process::Command::new("taskkill") + .args(["/PID", &pid.to_string(), "/T", "/F"]) + .status(); + + #[cfg(not(windows))] + let _ = std::process::Command::new("kill") + .arg(pid.to_string()) + .status(); + } + output( &serde_json::json!({ "status": "stopping", @@ -2774,9 +2977,6 @@ fn main() { }), json_mode, ); - if let Err(e) = std::fs::remove_file(&pid_path) { - eprintln!("Warning: could not remove PID file: {e}"); - } } Err(e) => { eprintln!("Error reading PID file: {e}"); @@ -2957,16 +3157,7 @@ fn main() { let pid_alive = pid_str .parse::() .ok() - .map(|pid| { - // Check process existence via kill -0 on Unix - std::process::Command::new("kill") - .args(["-0", &pid.to_string()]) - .stdout(std::process::Stdio::null()) - .stderr(std::process::Stdio::null()) - .status() - .map(|s| s.success()) - .unwrap_or(false) - }) + .map(pid_is_alive) .unwrap_or(false); // Read PID file modification time as a proxy for start time @@ -3395,3 +3586,4 @@ fn main() { } } } + From cd3927636e7751c7dfb744470d22c3c022ffe184 Mon Sep 17 00:00:00 2001 From: sampurnamo <157491025+sampurnamo@users.noreply.github.com> Date: Sat, 7 Mar 2026 01:59:45 -0500 Subject: [PATCH 12/12] fix(cli): seed daemon relay backlog on connect --- crates/agentic-comm-cli/src/main.rs | 70 ++++++++++++++++++++++++----- 1 file changed, 58 insertions(+), 12 deletions(-) diff --git a/crates/agentic-comm-cli/src/main.rs b/crates/agentic-comm-cli/src/main.rs index 8707c57..eed8793 100644 --- a/crates/agentic-comm-cli/src/main.rs +++ b/crates/agentic-comm-cli/src/main.rs @@ -1216,18 +1216,7 @@ fn collect_new_daemon_messages(path: &Path, last_seen_message_id: &mut u64) -> V continue; } - let payload = serde_json::json!({ - "sender": msg.sender, - "text": msg.content, - "timestamp": msg.timestamp.to_rfc3339(), - "channel": store - .get_channel(msg.channel_id) - .map(|channel| channel.name) - .unwrap_or_else(|| msg.channel_id.to_string()), - "lamport": msg.comm_timestamp.lamport, - }); - - match serde_json::to_string(&payload) { + match serialize_daemon_message(&store, &msg) { Ok(json) => payloads.push(json), Err(e) => eprintln!("Warning: daemon could not serialize message {}: {e}", msg.id), } @@ -1238,6 +1227,54 @@ fn collect_new_daemon_messages(path: &Path, last_seen_message_id: &mut u64) -> V payloads } +fn collect_recent_daemon_messages(path: &Path, limit: usize) -> Vec { + if !path.exists() { + return Vec::new(); + } + + let store = match CommStore::load(path) { + Ok(store) => store, + Err(e) => { + eprintln!("Warning: daemon could not load store {}: {e}", path.display()); + return Vec::new(); + } + }; + + let mut messages: Vec<_> = store.messages.values().cloned().collect(); + messages.sort_by_key(|msg| msg.id); + + let start = messages.len().saturating_sub(limit); + messages + .into_iter() + .skip(start) + .filter_map(|msg| match serialize_daemon_message(&store, &msg) { + Ok(json) => Some(json), + Err(e) => { + eprintln!("Warning: daemon could not serialize message {}: {e}", msg.id); + None + } + }) + .collect() +} + +fn serialize_daemon_message( + store: &CommStore, + msg: &agentic_comm::Message, +) -> Result { + let payload = serde_json::json!({ + "sender": msg.sender, + "text": msg.content, + "timestamp": msg.timestamp.to_rfc3339(), + "channel": store + .get_channel(msg.channel_id) + .map(|channel| channel.name) + .unwrap_or_else(|| msg.channel_id.to_string()), + "lamport": msg.comm_timestamp.lamport, + }); + + serde_json::to_string(&payload) +} + fn pid_is_alive(pid: u32) -> bool { #[cfg(windows)] { @@ -2906,7 +2943,16 @@ fn main() { accept_result = listener.accept() => { let (mut stream, _) = accept_result?; let mut receiver = sender.subscribe(); + let recent_messages = collect_recent_daemon_messages(&data_path, 50); tokio::spawn(async move { + for message in recent_messages { + if stream.write_all(message.as_bytes()).await.is_err() { + return; + } + if stream.write_all(b"\n").await.is_err() { + return; + } + } while let Ok(message) = receiver.recv().await { if stream.write_all(message.as_bytes()).await.is_err() { break;