Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 170 additions & 57 deletions packages/atxp/src/commands/notifications.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,76 +4,180 @@ import os from 'os';
import { execSync } from 'child_process';

const NOTIFICATIONS_BASE_URL = 'https://clowdbot-notifications.corp.circuitandchisel.com';
const OPENCLAW_CONFIG_PATH = '/data/.openclaw/openclaw.json';
const SESSIONS_PATH = '/data/.openclaw/agents/main/sessions/sessions.json';
const WORKSPACE_DIR = '/data/.openclaw/workspace';
const HEARTBEAT_SECTION_HEADER = '# ATXP Notification Relay';

// eslint-disable-next-line no-control-regex
const sanitizeSessionValue = (s: string) => s.replace(/[\x00-\x1f"`[\]]/g, '');

interface EnableResponse {
instance?: { webhookUrl?: string; hooksToken?: string };
webhook?: { id?: string; url?: string; eventTypes?: string[]; secret?: string; enabled?: boolean };
error?: string;
}

interface NotificationChannel {
channel: string; // "telegram", "discord", "slack", etc.
to: string; // peer ID (chat ID, channel ID, etc.)
}

/**
* Discover connected messaging channels by reading the local session store.
* Parses session keys like "agent:main:telegram:direct:8204320066" to extract
* the channel type and peer ID for each active DM session.
*/
async function discoverConnectedChannels(): Promise<NotificationChannel[]> {
try {
const raw = await fs.readFile(SESSIONS_PATH, 'utf-8');
const sessions = JSON.parse(raw);
const channels: NotificationChannel[] = [];
const seen = new Set<string>();

for (const key of Object.keys(sessions)) {
// Match DM session keys: agent:main:<channel>:direct:<peerId>
const match = key.match(/^agent:main:([^:]+):direct:(.+)$/);
if (!match) continue;
const channel = sanitizeSessionValue(match[1]).slice(0, 64);
const to = sanitizeSessionValue(match[2]).slice(0, 128);
if (!channel || !to) continue;
// Skip ephemeral channels (webchat has no persistent address)
if (channel === 'webchat') continue;
const dedupKey = `${channel}:${to}`;
if (seen.has(dedupKey)) continue;
seen.add(dedupKey);
channels.push({ channel, to });
}
return channels;
} catch {
return []; // sessions file may not exist yet
}
}

/**
* Build the HEARTBEAT.md content with notification relay instructions.
* The default heartbeat prompt says "Read HEARTBEAT.md if it exists. Follow it strictly."
* so writing these instructions directly to the file ensures the LLM relays
* system events to all discovered channels.
*/
function buildHeartbeatMd(channels: NotificationChannel[]): string {
const lines = [
HEARTBEAT_SECTION_HEADER,
'',
'When you see System: event lines about new emails or SMS messages,',
'you MUST immediately relay each one to the user.',
'Do NOT reply HEARTBEAT_OK when System events are present.',
'',
];

if (channels.length > 0) {
lines.push('Relay to ALL of these channels:');
for (const c of channels) {
lines.push(`- Use the message tool with channel=\`${c.channel}\` and target=\`${c.to}\``);
}
lines.push('');
}

return lines.join('\n');
}

/**
* Configure hooks in openclaw.json on the running instance.
* Configure hooks, heartbeat delivery target, and HEARTBEAT.md on the instance.
* Only runs when inside a Fly instance (FLY_MACHINE_ID is set).
* Updates openclaw.json with the hooks token and restarts the gateway.
*
* Discovers all connected messaging channels from the session store, writes
* HEARTBEAT.md with relay instructions for each channel, and sets the primary
* delivery target to the first discovered channel.
*/
async function configureHooksOnInstance(hooksToken: string): Promise<void> {
if (!process.env.FLY_MACHINE_ID) return;

const configPath = '/data/.openclaw/openclaw.json';
const configPath = OPENCLAW_CONFIG_PATH;
try {
const raw = await fs.readFile(configPath, 'utf-8');
const config = JSON.parse(raw);

// Discover connected channels from session store
const channels = await discoverConnectedChannels();

let changed = false;

// Configure hooks
if (!config.hooks) config.hooks = {};
// Already configured with this token — skip
if (config.hooks.token === hooksToken && config.hooks.enabled === true) return;
if (config.hooks.token !== hooksToken || config.hooks.enabled !== true) {
config.hooks.enabled = true;
config.hooks.token = hooksToken;
changed = true;
}

config.hooks.enabled = true;
config.hooks.token = hooksToken;
await fs.writeFile(configPath, JSON.stringify(config, null, 2));
console.log(chalk.gray('Hooks configured in openclaw.json'));
// Set primary delivery target to first discovered channel
if (!config.agents) config.agents = {};
if (!config.agents.defaults) config.agents.defaults = {};
if (!config.agents.defaults.heartbeat) config.agents.defaults.heartbeat = {};
const hb = config.agents.defaults.heartbeat;

// Restart gateway to pick up new config (watchdog auto-restarts it)
try {
execSync('pkill -f openclaw-gateway', { stdio: 'ignore' });
console.log(chalk.gray('Gateway restarting to apply hooks config...'));
} catch {
// Gateway may not be running yet — config will be picked up on next start
if (channels.length > 0) {
const primary = channels[0];
if (hb.target !== primary.channel || hb.to !== primary.to) {
hb.target = primary.channel;
hb.to = primary.to;
changed = true;
}
} else if (hb.target !== 'last') {
// No channels discovered — fall back to 'last' and clear stale target
hb.target = 'last';
delete hb.to;
changed = true;
}
} catch {
// Non-fatal — hooks will be configured on next reboot via entrypoint config sync
}
}

/**
* Send a one-time instruction to the agent via /hooks/wake so it saves
* notification handling instructions to its HEARTBEAT.md file.
*
* Trust boundary: This intentionally injects a system-level prompt into the
* agent's memory. The webhook endpoint MUST validate the Authorization header
* before acting on the message content. The hardcoded text is controlled by us
* (not user input) and tells the agent how to broadcast notifications.
*/
async function sendHeartbeatInstruction(webhookUrl: string, hooksToken: string): Promise<void> {
try {
const res = await fetch(webhookUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${hooksToken}`,
},
body: JSON.stringify({
text: 'SYSTEM: Push notifications are now enabled. When you receive a system event about a new email, proactively use the message tool to broadcast the notification to every connected channel. Check your available tools, find all messaging channels, and send the notification to each one individually. The goal is that the owner sees the notification regardless of which channel they are watching.',
mode: 'now',
}),
});
if (!res.ok) {
console.log(chalk.gray(`Note: Setup instruction returned HTTP ${res.status} — agent may not have received it.`));
if (changed) {
await fs.writeFile(configPath, JSON.stringify(config, null, 2));
console.log(chalk.gray('Hooks and heartbeat configured in openclaw.json'));
}

// Append notification relay instructions to HEARTBEAT.md.
// The default heartbeat prompt reads this file and follows it strictly.
await fs.mkdir(WORKSPACE_DIR, { recursive: true });
const heartbeatPath = `${WORKSPACE_DIR}/HEARTBEAT.md`;
const section = buildHeartbeatMd(channels);
let existing = '';
try { existing = await fs.readFile(heartbeatPath, 'utf-8'); } catch { /* file may not exist */ }
// Replace existing notification section or append if not present.
// Uses split-on-header to avoid regex edge cases with anchors/newlines.
const idx = existing.indexOf(HEARTBEAT_SECTION_HEADER);
if (idx !== -1) {
let before = existing.slice(0, idx);
// Ensure a newline separates preceding content from our section
if (before.length > 0 && !before.endsWith('\n')) before += '\n';
const afterHeader = existing.slice(idx + HEARTBEAT_SECTION_HEADER.length);
// Find next top-level heading. Assumes a preceding newline (standard markdown).
const nextHeading = afterHeader.search(/\n# /);
const after = nextHeading !== -1 ? afterHeader.slice(nextHeading) : '';
await fs.writeFile(heartbeatPath, before + section.trimEnd() + after);
} else {
console.log(chalk.gray('Notification instructions sent to your agent.'));
const separator = existing.length > 0 && !existing.endsWith('\n') ? '\n\n' : existing.length > 0 ? '\n' : '';
await fs.writeFile(heartbeatPath, existing + separator + section);
}
} catch {
console.log(chalk.gray('Note: Could not send setup instruction to instance.'));
console.log(chalk.gray('HEARTBEAT.md updated with notification relay instructions'));

if (channels.length > 0) {
console.log(chalk.gray(`Notification channels: ${channels.map(c => `${c.channel}:${c.to}`).join(', ')}`));
}

// Restart gateway to pick up new config (watchdog auto-restarts it)
if (changed) {
try {
execSync('pkill -f openclaw-gateway', { stdio: 'ignore' });
console.log(chalk.gray('Gateway restarting to apply config...'));
} catch {
// Gateway may not be running yet — config will be picked up on next start
}
}
} catch (err) {
console.log(chalk.yellow('Warning: Could not configure instance locally.'));
console.log(chalk.gray(`${err instanceof Error ? err.message : err}`));
console.log(chalk.gray('Hooks will be configured on next instance reboot.'));
}
}

Expand All @@ -91,9 +195,13 @@ function getMachineId(): string | undefined {
}

async function getAccountId(): Promise<string | undefined> {
const { getAccountInfo } = await import('./whoami.js');
const account = await getAccountInfo();
return account?.accountId;
try {
const { getAccountInfo } = await import('./whoami.js');
const account = await getAccountInfo();
return account?.accountId;
} catch {
return undefined;
}
}

async function enableNotifications(): Promise<void> {
Expand All @@ -112,11 +220,18 @@ async function enableNotifications(): Promise<void> {
const body: Record<string, string> = { machine_id: machineId };
if (accountId) body.account_id = accountId;

const res = await fetch(`${NOTIFICATIONS_BASE_URL}/notifications/enable`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(body),
});
let res: Response;
try {
res = await fetch(`${NOTIFICATIONS_BASE_URL}/notifications/enable`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(body),
});
} catch (err) {
console.error(chalk.red(`Error: Could not reach notifications service.`));
console.error(chalk.gray(`${err instanceof Error ? err.message : err}`));
process.exit(1);
}

const data = await res.json().catch(() => ({})) as EnableResponse;
if (!res.ok) {
Expand Down Expand Up @@ -144,9 +259,6 @@ async function enableNotifications(): Promise<void> {
console.log(chalk.gray('Save the secret — it will not be shown again.'));
console.log(chalk.gray('Use it to verify webhook signatures (HMAC-SHA256).'));
}

// Send one-time HEARTBEAT.md instruction to the agent
await sendHeartbeatInstruction(instance.webhookUrl, instance.hooksToken);
}

function showNotificationsHelp(): void {
Expand All @@ -156,6 +268,7 @@ function showNotificationsHelp(): void {
console.log();
console.log(chalk.bold('Available Events:'));
console.log(' ' + chalk.green('email.received') + ' ' + 'Triggered when an inbound email arrives');
console.log(' ' + chalk.green('sms.received') + ' ' + 'Triggered when an inbound SMS arrives');
console.log();
console.log(chalk.bold('Examples:'));
console.log(' npx atxp notifications enable');
Expand Down
2 changes: 1 addition & 1 deletion skills/atxp/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ Local contacts database for resolving names to phone numbers and emails. Stored

### Notifications

Enable push notifications so your agent receives a POST to its `/hooks/wake` endpoint when events happen (e.g., inbound email), instead of polling.
Enable push notifications so your agent receives a POST to its `/hooks/wake` endpoint when events happen (e.g., inbound email or SMS), instead of polling.

| Command | Cost | Description |
|---------|------|-------------|
Expand Down
Loading