📋 Summary
+{summary_escaped}
+diff --git a/.github/workflows/full-tests-with-api.yml b/.github/workflows/full-tests-with-api.yml index be54b987..b5881fcd 100644 --- a/.github/workflows/full-tests-with-api.yml +++ b/.github/workflows/full-tests-with-api.yml @@ -116,15 +116,23 @@ jobs: echo "test_exit_code=$TEST_EXIT_CODE" >> $GITHUB_ENV exit 0 # Don't fail here, we'll fail at the end after uploading artifacts - - name: Show service logs + - name: Save service logs to files if: always() working-directory: backends/advanced run: | - echo "=== Backend Logs (last 50 lines) ===" - docker compose -f docker-compose-test.yml logs --tail=50 chronicle-backend-test + echo "Checking running containers..." + docker compose -f docker-compose-test.yml ps -a echo "" - echo "=== Worker Logs (last 50 lines) ===" - docker compose -f docker-compose-test.yml logs --tail=50 workers-test + echo "Saving service logs to files..." + mkdir -p logs + docker compose -f docker-compose-test.yml logs chronicle-backend-test > logs/backend.log 2>&1 || true + docker compose -f docker-compose-test.yml logs workers-test > logs/workers.log 2>&1 || true + docker compose -f docker-compose-test.yml logs mongo-test > logs/mongo.log 2>&1 || true + docker compose -f docker-compose-test.yml logs redis-test > logs/redis.log 2>&1 || true + docker compose -f docker-compose-test.yml logs qdrant-test > logs/qdrant.log 2>&1 || true + docker compose -f docker-compose-test.yml logs speaker-service-test > logs/speaker.log 2>&1 || true + echo "✓ Logs saved to backends/advanced/logs/" + ls -lh logs/ - name: Check if test results exist if: always() @@ -200,6 +208,7 @@ jobs: with: name: robot-test-logs-full path: | + backends/advanced/logs/*.log backends/advanced/.env tests/setup/.env.test retention-days: 7 diff --git a/.github/workflows/pr-tests-with-api.yml b/.github/workflows/pr-tests-with-api.yml index 0ccff169..aeb45b1c 100644 --- a/.github/workflows/pr-tests-with-api.yml +++ b/.github/workflows/pr-tests-with-api.yml @@ -110,15 +110,23 @@ jobs: echo "test_exit_code=$TEST_EXIT_CODE" >> $GITHUB_ENV exit 0 # Don't fail here, we'll fail at the end after uploading artifacts - - name: Show service logs + - name: Save service logs to files if: always() working-directory: backends/advanced run: | - echo "=== Backend Logs (last 50 lines) ===" - docker compose -f docker-compose-test.yml logs --tail=50 chronicle-backend-test + echo "Checking running containers..." + docker compose -f docker-compose-test.yml ps -a echo "" - echo "=== Worker Logs (last 50 lines) ===" - docker compose -f docker-compose-test.yml logs --tail=50 workers-test + echo "Saving service logs to files..." + mkdir -p logs + docker compose -f docker-compose-test.yml logs chronicle-backend-test > logs/backend.log 2>&1 || true + docker compose -f docker-compose-test.yml logs workers-test > logs/workers.log 2>&1 || true + docker compose -f docker-compose-test.yml logs mongo-test > logs/mongo.log 2>&1 || true + docker compose -f docker-compose-test.yml logs redis-test > logs/redis.log 2>&1 || true + docker compose -f docker-compose-test.yml logs qdrant-test > logs/qdrant.log 2>&1 || true + docker compose -f docker-compose-test.yml logs speaker-service-test > logs/speaker.log 2>&1 || true + echo "✓ Logs saved to backends/advanced/logs/" + ls -lh logs/ - name: Check if test results exist if: always() @@ -242,6 +250,7 @@ jobs: with: name: robot-test-logs-pr-labeled path: | + backends/advanced/logs/*.log backends/advanced/.env tests/setup/.env.test retention-days: 7 diff --git a/.github/workflows/robot-tests.yml b/.github/workflows/robot-tests.yml index 486273dc..35e4dffa 100644 --- a/.github/workflows/robot-tests.yml +++ b/.github/workflows/robot-tests.yml @@ -85,15 +85,23 @@ jobs: echo "test_exit_code=$TEST_EXIT_CODE" >> $GITHUB_ENV exit 0 # Don't fail here, we'll fail at the end after uploading artifacts - - name: Show service logs + - name: Save service logs to files if: always() working-directory: backends/advanced run: | - echo "=== Backend Logs (last 50 lines) ===" - docker compose -f docker-compose-test.yml logs --tail=50 chronicle-backend-test + echo "Checking running containers..." + docker compose -f docker-compose-test.yml ps -a echo "" - echo "=== Worker Logs (last 50 lines) ===" - docker compose -f docker-compose-test.yml logs --tail=50 workers-test + echo "Saving service logs to files..." + mkdir -p logs + docker compose -f docker-compose-test.yml logs chronicle-backend-test > logs/backend.log 2>&1 || true + docker compose -f docker-compose-test.yml logs workers-test > logs/workers.log 2>&1 || true + docker compose -f docker-compose-test.yml logs mongo-test > logs/mongo.log 2>&1 || true + docker compose -f docker-compose-test.yml logs redis-test > logs/redis.log 2>&1 || true + docker compose -f docker-compose-test.yml logs qdrant-test > logs/qdrant.log 2>&1 || true + docker compose -f docker-compose-test.yml logs speaker-service-test > logs/speaker.log 2>&1 || true + echo "✓ Logs saved to backends/advanced/logs/" + ls -lh logs/ - name: Check if test results exist if: always() @@ -215,8 +223,9 @@ jobs: if: failure() uses: actions/upload-artifact@v4 with: - name: robot-test-logs + name: robot-test-logs-no-api path: | + backends/advanced/logs/*.log backends/advanced/.env tests/setup/.env.test retention-days: 7 diff --git a/CLAUDE.md b/CLAUDE.md index b16d1e8c..faed99c2 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -325,6 +325,31 @@ OLLAMA_BASE_URL=http://ollama:11434 SPEAKER_SERVICE_URL=http://speaker-recognition:8085 ``` +### Plugin Security Architecture + +**Three-File Separation**: + +1. **backends/advanced/.env** - Secrets (gitignored) + ```bash + SMTP_PASSWORD=abcdefghijklmnop + OPENAI_API_KEY=sk-proj-... + ``` + +2. **config/plugins.yml** - Orchestration (uses env var references) + ```yaml + plugins: + email_summarizer: + enabled: true + smtp_password: ${SMTP_PASSWORD} # Reference, not actual value! + ``` + +3. **plugins/{plugin_id}/config.yml** - Non-secret defaults + ```yaml + subject_prefix: "Conversation Summary" + ``` + +**CRITICAL**: Never hardcode secrets in `config/plugins.yml`. Always use `${ENV_VAR}` syntax. + ## Quick API Reference ### Common Endpoints diff --git a/backends/advanced/.env.template b/backends/advanced/.env.template index 88617688..818b47b6 100644 --- a/backends/advanced/.env.template +++ b/backends/advanced/.env.template @@ -58,5 +58,49 @@ LANGFUSE_SECRET_KEY= # Tailscale auth key (for remote service access) TS_AUTHKEY= -# Home Assistant long-lived access token (for voice control plugin) +# ======================================== +# Plugin Configuration +# ======================================== +# Plugin-specific configuration is in: backends/advanced/src/advanced_omi_backend/plugins/{plugin_id}/config.yml +# Plugin orchestration (enabled, events) is in: config/plugins.yml +# This section contains ONLY plugin secrets + +# --------------------------------------- +# Home Assistant Plugin +# --------------------------------------- +# Enable in config/plugins.yml +# Configure in backends/advanced/src/advanced_omi_backend/plugins/homeassistant/config.yml + +# Home Assistant server URL +HA_URL=http://homeassistant.local:8123 + +# Home Assistant long-lived access token +# Get from: Profile → Security → Long-Lived Access Tokens HA_TOKEN= + +# Wake word for voice commands (optional, default: vivi) +HA_WAKE_WORD=vivi + +# Request timeout in seconds (optional, default: 30) +HA_TIMEOUT=30 + +# --------------------------------------- +# Email Summarizer Plugin +# --------------------------------------- +# Enable in config/plugins.yml +# Configure in backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/config.yml + +# SMTP server configuration +# For Gmail: Use App Password (requires 2FA enabled) +# 1. Go to Google Account → Security → 2-Step Verification +# 2. Scroll to "App passwords" → Generate password for "Mail" +# 3. Use the 16-character password below (no spaces) +SMTP_HOST=smtp.gmail.com +SMTP_PORT=587 +SMTP_USERNAME=your-email@gmail.com +SMTP_PASSWORD=your-app-password-here +SMTP_USE_TLS=true + +# Email sender information +FROM_EMAIL=noreply@chronicle.ai +FROM_NAME=Chronicle AI diff --git a/backends/advanced/Docs/plugin-configuration.md b/backends/advanced/Docs/plugin-configuration.md new file mode 100644 index 00000000..a4c7b222 --- /dev/null +++ b/backends/advanced/Docs/plugin-configuration.md @@ -0,0 +1,399 @@ +# Plugin Configuration Architecture + +Chronicle uses a clean separation of concerns for plugin configuration, dividing settings across three locations based on their purpose. + +## Configuration Files + +### 1. `config/plugins.yml` - Orchestration Only + +**Purpose**: Controls which plugins are enabled and what events they listen to + +**Contains**: +- Plugin enable/disable flags +- Event subscriptions +- Trigger conditions (wake words, etc.) + +**Example**: +```yaml +plugins: + email_summarizer: + enabled: true + events: + - conversation.complete + condition: + type: always + + homeassistant: + enabled: false + events: + - transcript.streaming + condition: + type: wake_word + wake_words: + - hey vivi +``` + +### 2. `backends/advanced/src/advanced_omi_backend/plugins/{plugin_id}/config.yml` - Plugin Settings + +**Purpose**: Plugin-specific non-secret configuration + +**Contains**: +- Feature flags +- Timeouts and limits +- Display preferences +- References to environment variables using `${VAR_NAME}` syntax + +**Example** (`plugins/email_summarizer/config.yml`): +```yaml +# Email content settings +subject_prefix: "Conversation Summary" +summary_max_sentences: 3 +include_conversation_id: true + +# SMTP config (reads from .env) +smtp_host: ${SMTP_HOST} +smtp_port: ${SMTP_PORT:-587} +smtp_username: ${SMTP_USERNAME} +smtp_password: ${SMTP_PASSWORD} +``` + +### 3. `backends/advanced/.env` - Secrets Only + +**Purpose**: All secret values (API keys, passwords, tokens) + +**Contains**: +- API keys +- Authentication tokens +- SMTP credentials +- Database passwords + +**Example**: +```bash +# Email Summarizer Plugin +SMTP_HOST=smtp.gmail.com +SMTP_PORT=587 +SMTP_USERNAME=your-email@gmail.com +SMTP_PASSWORD=your-app-password-here + +# Home Assistant Plugin +HA_URL=http://homeassistant.local:8123 +HA_TOKEN=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9... +``` + +## Configuration Loading Process + +When a plugin is initialized, Chronicle merges configuration from all three sources: + +``` +1. Load plugins/{plugin_id}/config.yml + ↓ +2. Expand ${ENV_VAR} references from .env + ↓ +3. Merge orchestration settings from config/plugins.yml + ↓ +4. Pass complete config to plugin constructor +``` + +### Example Configuration Flow + +**Email Summarizer Plugin**: + +1. **Load** `plugins/email_summarizer/config.yml`: + ```yaml + subject_prefix: "Conversation Summary" + smtp_host: ${SMTP_HOST} + smtp_password: ${SMTP_PASSWORD} + ``` + +2. **Expand env vars** from `.env`: + ```yaml + subject_prefix: "Conversation Summary" + smtp_host: "smtp.gmail.com" # ← Expanded + smtp_password: "app-password-123" # ← Expanded + ``` + +3. **Merge orchestration** from `config/plugins.yml`: + ```yaml + enabled: true # ← Added + events: ["conversation.complete"] # ← Added + condition: {type: "always"} # ← Added + subject_prefix: "Conversation Summary" + smtp_host: "smtp.gmail.com" + smtp_password: "app-password-123" + ``` + +4. **Pass to plugin** constructor with complete config + +## Environment Variable Expansion + +Plugin config files use `${VAR_NAME}` syntax for environment variable references: + +- **Simple reference**: `${SMTP_HOST}` → expands to env value +- **With default**: `${SMTP_PORT:-587}` → uses 587 if SMTP_PORT not set +- **Missing vars**: Logs warning and keeps placeholder + +**Example**: +```yaml +# In plugin config.yml +smtp_host: ${SMTP_HOST} +smtp_port: ${SMTP_PORT:-587} +timeout: ${HA_TIMEOUT:-30} + +# With .env: +# SMTP_HOST=smtp.gmail.com +# (SMTP_PORT not set) +# HA_TIMEOUT=60 + +# Results in: +# smtp_host: "smtp.gmail.com" +# smtp_port: "587" # ← Used default +# timeout: "60" # ← From .env +``` + +## Creating a New Plugin + +To add a new plugin with proper configuration: + +### 1. Create plugin directory structure + +```bash +backends/advanced/src/advanced_omi_backend/plugins/my_plugin/ +├── __init__.py # Export plugin class +├── plugin.py # Plugin implementation +└── config.yml # Plugin-specific config +``` + +### 2. Add plugin config file + +**`plugins/my_plugin/config.yml`**: +```yaml +# My Plugin Configuration +# Non-secret settings only + +# Feature settings +feature_enabled: true +timeout: ${MY_PLUGIN_TIMEOUT:-30} + +# API configuration (secrets from .env) +api_url: ${MY_PLUGIN_API_URL} +api_key: ${MY_PLUGIN_API_KEY} +``` + +### 3. Add secrets to `.env.template` + +**`backends/advanced/.env.template`**: +```bash +# My Plugin +MY_PLUGIN_API_URL=https://api.example.com +MY_PLUGIN_API_KEY= +MY_PLUGIN_TIMEOUT=30 +``` + +### 4. Add orchestration settings + +**`config/plugins.yml`**: +```yaml +plugins: + my_plugin: + enabled: false + events: + - conversation.complete + condition: + type: always +``` + +### 5. Implement plugin class + +**`plugins/my_plugin/plugin.py`**: +```python +from ..base import BasePlugin, PluginContext, PluginResult + +class MyPlugin(BasePlugin): + def __init__(self, config: Dict[str, Any]): + super().__init__(config) + # Config automatically merged from all sources + self.api_url = config.get('api_url') + self.api_key = config.get('api_key') + self.timeout = config.get('timeout', 30) + + async def initialize(self): + # Plugin initialization + pass + + async def on_conversation_complete(self, context: PluginContext): + # Event handler + pass +``` + +## Benefits of This Architecture + +✅ **Clean separation**: Secrets (.env) vs Config (yml) vs Orchestration (plugins.yml) + +✅ **Plugin portability**: Each plugin has self-contained config.yml + +✅ **No secret duplication**: Secrets only in .env, referenced via ${VAR} + +✅ **Easy discovery**: Want to configure a plugin? → `plugins/{plugin_id}/config.yml` + +✅ **Main config.yml stays clean**: No plugin pollution in main backend config + +✅ **Unified interface**: All plugins loaded with same pattern via `load_plugin_config()` + +## Troubleshooting + +### Plugin not loading + +**Check logs** for: +- "Plugin 'X' not found" → Directory/file structure issue +- "Environment variable 'X' not found" → Missing .env entry +- "Failed to load config.yml" → YAML syntax error + +**Verify**: +```bash +# Check plugin directory exists +ls backends/advanced/src/advanced_omi_backend/plugins/my_plugin/ + +# Validate config.yml syntax +python -c "import yaml; yaml.safe_load(open('plugins/my_plugin/config.yml'))" + +# Check .env has required vars +grep MY_PLUGIN .env +``` + +### Environment variables not expanding + +**Problem**: `${SMTP_HOST}` stays as literal text + +**Solution**: +- Ensure `.env` file exists in `backends/advanced/.env` +- Check variable name matches exactly (case-sensitive) +- Restart backend after .env changes +- Check logs for "Environment variable 'X' not found" warnings + +### Plugin enabled but not running + +**Check**: +1. `config/plugins.yml` has `enabled: true` +2. Plugin subscribed to correct events +3. Conditions are met (wake words, etc.) +4. Plugin initialized without errors (check logs) + +## Using Shared Setup Utilities in Plugin Setup Scripts + +Chronicle provides shared utilities (`setup_utils.py`) for creating interactive plugin setup wizards with password masking and existing value detection. + +### Quick Reference + +```python +#!/usr/bin/env python3 +import sys +from pathlib import Path + +# Import shared utilities +project_root = Path(__file__).resolve().parents[6] +sys.path.insert(0, str(project_root)) + +from setup_utils import ( + prompt_with_existing_masked, # Main function for masked prompts + prompt_value, # Simple value prompts + prompt_password, # Password with validation + mask_value, # Mask a value manually + read_env_value # Read from .env +) +from dotenv import set_key + +# Path to backend .env +env_path = str(project_root / "backends" / "advanced" / ".env") + +# Prompt for password/token with masking +api_key = prompt_with_existing_masked( + prompt_text="API Key", + env_file_path=env_path, + env_key="MY_PLUGIN_API_KEY", + placeholders=['your-key-here'], + is_password=True # ← Shows masked existing value +) + +# Save to .env +set_key(env_path, "MY_PLUGIN_API_KEY", api_key) +``` + +### Function Details + +**`prompt_with_existing_masked()`** - Primary function for secrets + +Shows masked existing values and allows users to reuse them: +```python +smtp_password = prompt_with_existing_masked( + prompt_text="SMTP Password", + env_file_path="../../.env", # Path to .env file + env_key="SMTP_PASSWORD", # Environment variable name + placeholders=['your-password-here'], # Values to treat as "not set" + is_password=True, # Use masking and hidden input + default="" # Fallback if no existing value +) +# Output: SMTP Password (smtp_***********word) [press Enter to reuse, or enter new]: +``` + +**Benefits:** +- ✅ Shows previously configured values as masked (e.g., `sk-pr***********xyz`) +- ✅ Lets users press Enter to keep existing value (no re-entry needed) +- ✅ Automatically reads from .env if path/key provided +- ✅ Works with placeholders - treats them as "not configured" + +**`prompt_password()`** - Password with validation + +```python +admin_pass = prompt_password( + prompt_text="Admin Password", + min_length=8, # Minimum length requirement + allow_generated=True # Auto-generate in non-interactive mode +) +``` + +**`prompt_value()`** - Simple value prompts + +```python +port = prompt_value("SMTP Port", default="587") +``` + +### Complete Plugin Setup Example + +See `backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/setup.py` for a complete working example showing: +- Masked password/token prompts with existing value reuse +- Saving credentials to backend .env +- Clean user-facing instructions +- Error handling + +### Best Practices + +1. **Always show masked values for secrets** - Use `is_password=True` +2. **Auto-read from .env** - Provide `env_file_path` and `env_key` parameters +3. **Use placeholders** - Define common placeholder values to detect "not configured" +4. **Save to backend .env** - All plugin secrets go in `backends/advanced/.env` +5. **Clear instructions** - Tell users what to do next (enable in plugins.yml, restart) + +### Convenience Functions + +For common patterns, use the convenience wrappers: + +```python +from setup_utils import prompt_api_key, prompt_token + +# API keys +openai_key = prompt_api_key("OpenAI", env_file_path="../../.env") +# Prompts: "OpenAI API Key" +# Env var: OPENAI_API_KEY + +# Auth tokens +ha_token = prompt_token("Home Assistant", env_file_path="../../.env") +# Prompts: "Home Assistant Token" +# Env var: HOME_ASSISTANT_TOKEN +``` + +## See Also + +- [CLAUDE.md](../../../CLAUDE.md) - Main documentation +- [Plugin Development Guide](plugin-development.md) - Creating custom plugins +- [Environment Variables](environment-variables.md) - Complete .env reference +- [setup_utils.py](../../../setup_utils.py) - Shared setup utility reference diff --git a/backends/advanced/docker-compose-test.yml b/backends/advanced/docker-compose-test.yml index 09cd04ca..999b37a2 100644 --- a/backends/advanced/docker-compose-test.yml +++ b/backends/advanced/docker-compose-test.yml @@ -17,7 +17,7 @@ services: - ./data/test_debug_dir:/app/debug # Fixed: mount to /app/debug for plugin database - ./data/test_data:/app/data - ../../config:/app/config # Mount config directory with defaults.yml and config.yml - - ${PLUGINS_CONFIG:-../../tests/config/plugins.test.yml}:/app/plugins.yml # Mount test plugins config + - ${PLUGINS_CONFIG:-../../tests/config/plugins.test.yml}:/app/config/plugins.yml # Mount test plugins config to correct location environment: # Override with test-specific settings - MONGODB_URI=mongodb://mongo-test:27017/test_db @@ -169,7 +169,7 @@ services: - ./data/test_debug_dir:/app/debug # Fixed: mount to /app/debug for plugin database - ./data/test_data:/app/data - ../../config:/app/config # Mount config directory with defaults.yml and config.yml - - ${PLUGINS_CONFIG:-../../tests/config/plugins.test.yml}:/app/plugins.yml # Mount test plugins config + - ${PLUGINS_CONFIG:-../../tests/config/plugins.test.yml}:/app/config/plugins.yml # Mount test plugins config to correct location environment: # Same environment as backend - MONGODB_URI=mongodb://mongo-test:27017/test_db diff --git a/backends/advanced/docker-compose.yml b/backends/advanced/docker-compose.yml index c5d718a3..230f40c9 100644 --- a/backends/advanced/docker-compose.yml +++ b/backends/advanced/docker-compose.yml @@ -56,6 +56,7 @@ services: - HA_TOKEN=${HA_TOKEN} - CORS_ORIGINS=http://localhost:3010,http://localhost:8000,http://192.168.1.153:3010,http://192.168.1.153:8000,https://localhost:3010,https://localhost:8000,https://100.105.225.45,https://localhost - REDIS_URL=redis://redis:6379/0 + - MONGODB_URI=mongodb://mongo:27017 depends_on: qdrant: condition: service_started @@ -101,6 +102,7 @@ services: - GROQ_API_KEY=${GROQ_API_KEY} - HA_TOKEN=${HA_TOKEN} - REDIS_URL=redis://redis:6379/0 + - MONGODB_URI=mongodb://mongo:27017 # Worker orchestrator configuration (optional - defaults shown) - WORKER_CHECK_INTERVAL=${WORKER_CHECK_INTERVAL:-10} - MIN_RQ_WORKERS=${MIN_RQ_WORKERS:-6} diff --git a/backends/advanced/docs/plugin-development-guide.md b/backends/advanced/docs/plugin-development-guide.md new file mode 100644 index 00000000..17c53b4a --- /dev/null +++ b/backends/advanced/docs/plugin-development-guide.md @@ -0,0 +1,776 @@ +# Chronicle Plugin Development Guide + +A comprehensive guide to creating custom plugins for Chronicle. + +## Table of Contents + +1. [Introduction](#introduction) +2. [Quick Start](#quick-start) +3. [Plugin Architecture](#plugin-architecture) +4. [Event Types](#event-types) +5. [Creating Your First Plugin](#creating-your-first-plugin) +6. [Configuration](#configuration) +7. [Testing Plugins](#testing-plugins) +8. [Best Practices](#best-practices) +9. [Examples](#examples) +10. [Troubleshooting](#troubleshooting) + +## Introduction + +Chronicle's plugin system allows you to extend functionality by subscribing to events and executing custom logic. Plugins are: + +- **Event-driven**: React to transcripts, conversations, or memory processing +- **Auto-discovered**: Drop plugins into the `plugins/` directory +- **Configurable**: YAML-based configuration with environment variable support +- **Isolated**: Each plugin runs independently with proper error handling + +### Plugin Types + +- **Core Plugins**: Built-in plugins (`homeassistant`, `test_event`) +- **Community Plugins**: Auto-discovered plugins in `plugins/` directory + +## Quick Start + +### 1. Generate Plugin Boilerplate + +```bash +cd backends/advanced +uv run python scripts/create_plugin.py my_awesome_plugin +``` + +This creates: +``` +plugins/my_awesome_plugin/ +├── __init__.py # Plugin exports +├── plugin.py # Main plugin logic +└── README.md # Plugin documentation +``` + +### 2. Implement Plugin Logic + +Edit `plugins/my_awesome_plugin/plugin.py`: + +```python +async def on_conversation_complete(self, context: PluginContext) -> Optional[PluginResult]: + """Handle conversation completion.""" + transcript = context.data.get('transcript', '') + + # Your custom logic here + print(f"Processing: {transcript}") + + return PluginResult(success=True, message="Processing complete") +``` + +### 3. Configure Plugin + +Add to `config/plugins.yml`: + +```yaml +plugins: + my_awesome_plugin: + enabled: true + events: + - conversation.complete + condition: + type: always +``` + +### 4. Restart Backend + +```bash +cd backends/advanced +docker compose restart +``` + +Your plugin will be auto-discovered and loaded! + +## Plugin Architecture + +### Base Plugin Class + +All plugins inherit from `BasePlugin`: + +```python +from advanced_omi_backend.plugins.base import BasePlugin, PluginContext, PluginResult + +class MyPlugin(BasePlugin): + SUPPORTED_ACCESS_LEVELS = ['conversation'] # Which events you support + + async def initialize(self): + """Initialize resources (called on app startup)""" + pass + + async def cleanup(self): + """Clean up resources (called on app shutdown)""" + pass + + async def on_conversation_complete(self, context: PluginContext): + """Handle conversation.complete events""" + pass +``` + +### Plugin Context + +Context passed to plugin methods: + +```python +@dataclass +class PluginContext: + user_id: str # User identifier + event: str # Event name (e.g., "conversation.complete") + data: Dict[str, Any] # Event-specific data + metadata: Dict[str, Any] # Additional metadata +``` + +### Plugin Result + +Return value from plugin methods: + +```python +@dataclass +class PluginResult: + success: bool # Whether operation succeeded + data: Optional[Dict[str, Any]] # Optional result data + message: Optional[str] # Optional status message + should_continue: bool # Whether to continue normal processing (default: True) +``` + +## Event Types + +### 1. Transcript Events (`transcript.streaming`) + +**When**: Real-time transcript segments arrive from WebSocket +**Context Data**: +- `transcript` (str): The transcript text +- `segment_id` (str): Unique segment identifier +- `conversation_id` (str): Current conversation ID + +**Use Cases**: +- Wake word detection +- Real-time command processing +- Live transcript analysis + +**Example**: +```python +async def on_transcript(self, context: PluginContext): + transcript = context.data.get('transcript', '') + if 'urgent' in transcript.lower(): + await self.send_notification(transcript) +``` + +### 2. Conversation Events (`conversation.complete`) + +**When**: Conversation processing finishes +**Context Data**: +- `conversation` (dict): Full conversation data +- `transcript` (str): Complete transcript +- `duration` (float): Conversation duration in seconds +- `conversation_id` (str): Conversation identifier + +**Use Cases**: +- Email summaries +- Analytics tracking +- External integrations +- Conversation archiving + +**Example**: +```python +async def on_conversation_complete(self, context: PluginContext): + conversation = context.data.get('conversation', {}) + duration = context.data.get('duration', 0) + + if duration > 300: # 5 minutes + await self.archive_long_conversation(conversation) +``` + +### 3. Memory Events (`memory.processed`) + +**When**: Memory extraction finishes +**Context Data**: +- `memories` (list): Extracted memories +- `conversation` (dict): Source conversation +- `memory_count` (int): Number of memories created +- `conversation_id` (str): Conversation identifier + +**Use Cases**: +- Memory indexing +- Knowledge graph updates +- Memory notifications +- Analytics + +**Example**: +```python +async def on_memory_processed(self, context: PluginContext): + memories = context.data.get('memories', []) + + for memory in memories: + await self.index_memory(memory) +``` + +## Creating Your First Plugin + +### Step 1: Generate Boilerplate + +```bash +uv run python scripts/create_plugin.py todo_extractor +``` + +### Step 2: Define Plugin Logic + +```python +""" +Todo Extractor Plugin - Extracts action items from conversations. +""" +import logging +import re +from typing import Any, Dict, List, Optional + +from ..base import BasePlugin, PluginContext, PluginResult + +logger = logging.getLogger(__name__) + + +class TodoExtractorPlugin(BasePlugin): + """Extract and save action items from conversations.""" + + SUPPORTED_ACCESS_LEVELS = ['conversation'] + + def __init__(self, config: Dict[str, Any]): + super().__init__(config) + self.todo_patterns = [ + r'I need to (.+)', + r'I should (.+)', + r'TODO: (.+)', + r'reminder to (.+)', + ] + + async def initialize(self): + if not self.enabled: + return + + logger.info("TodoExtractor plugin initialized") + + async def on_conversation_complete(self, context: PluginContext): + try: + transcript = context.data.get('transcript', '') + todos = self._extract_todos(transcript) + + if todos: + await self._save_todos(context.user_id, todos) + + return PluginResult( + success=True, + message=f"Extracted {len(todos)} action items", + data={'todos': todos} + ) + + return PluginResult(success=True, message="No action items found") + + except Exception as e: + logger.error(f"Error extracting todos: {e}") + return PluginResult(success=False, message=str(e)) + + def _extract_todos(self, transcript: str) -> List[str]: + """Extract todo items from transcript.""" + todos = [] + + for pattern in self.todo_patterns: + matches = re.findall(pattern, transcript, re.IGNORECASE) + todos.extend(matches) + + return list(set(todos)) # Remove duplicates + + async def _save_todos(self, user_id: str, todos: List[str]): + """Save todos to database or external service.""" + from advanced_omi_backend.database import get_database + + db = get_database() + for todo in todos: + await db['todos'].insert_one({ + 'user_id': user_id, + 'task': todo, + 'completed': False, + 'created_at': datetime.utcnow() + }) +``` + +### Step 3: Configure Plugin + +`config/plugins.yml`: + +```yaml +plugins: + todo_extractor: + enabled: true + events: + - conversation.complete + condition: + type: always +``` + +### Step 4: Test Plugin + +1. Restart backend: `docker compose restart` +2. Create a conversation with phrases like "I need to buy milk" +3. Check logs: `docker compose logs -f chronicle-backend | grep TodoExtractor` +4. Verify todos in database + +## Configuration + +### YAML Configuration + +`config/plugins.yml`: + +```yaml +plugins: + my_plugin: + # Basic Configuration + enabled: true # Enable/disable plugin + + # Event Subscriptions + events: + - conversation.complete + - memory.processed + + # Execution Conditions + condition: + type: always # always, wake_word, regex + # wake_words: ["hey assistant"] # For wake_word type + # pattern: "urgent" # For regex type + + # Custom Configuration + api_url: ${MY_API_URL} # Environment variable + timeout: 30 + max_retries: 3 +``` + +### Environment Variables + +Use `${VAR_NAME}` syntax: + +```yaml +api_key: ${MY_API_KEY} +base_url: ${BASE_URL:-http://localhost:8000} # With default +``` + +Add to `.env`: + +```bash +MY_API_KEY=your-key-here +BASE_URL=https://api.example.com +``` + +### Condition Types + +**Always Execute**: +```yaml +condition: + type: always +``` + +**Wake Word** (transcript events only): +```yaml +condition: + type: wake_word + wake_words: + - hey assistant + - computer +``` + +**Regex Pattern**: +```yaml +condition: + type: regex + pattern: "urgent|important" +``` + +## Testing Plugins + +### Unit Tests + +`tests/test_my_plugin.py`: + +```python +import pytest +from plugins.my_plugin import MyPlugin +from plugins.base import PluginContext + +class TestMyPlugin: + def test_plugin_initialization(self): + config = {'enabled': True, 'events': ['conversation.complete']} + plugin = MyPlugin(config) + assert plugin.enabled is True + + @pytest.mark.asyncio + async def test_conversation_processing(self): + plugin = MyPlugin({'enabled': True}) + await plugin.initialize() + + context = PluginContext( + user_id='test-user', + event='conversation.complete', + data={'transcript': 'Test transcript'} + ) + + result = await plugin.on_conversation_complete(context) + assert result.success is True +``` + +### Integration Testing + +1. **Enable Test Plugin**: +```yaml +test_event: + enabled: true + events: + - conversation.complete +``` + +2. **Check Logs**: +```bash +docker compose logs -f | grep "test_event" +``` + +3. **Upload Test Audio**: +```bash +curl -X POST http://localhost:8000/api/process-audio-files \ + -H "Authorization: Bearer $TOKEN" \ + -F "files=@test.wav" +``` + +### Manual Testing Checklist + +- [ ] Plugin loads without errors +- [ ] Configuration validates correctly +- [ ] Events trigger plugin execution +- [ ] Plugin logic executes successfully +- [ ] Errors are handled gracefully +- [ ] Logs provide useful information + +## Best Practices + +### 1. Error Handling + +Always wrap logic in try-except: + +```python +async def on_conversation_complete(self, context): + try: + # Your logic + result = await self.process(context) + return PluginResult(success=True, data=result) + except Exception as e: + logger.error(f"Error: {e}", exc_info=True) + return PluginResult(success=False, message=str(e)) +``` + +### 2. Logging + +Use appropriate log levels: + +```python +logger.debug("Detailed debug information") +logger.info("Important milestones") +logger.warning("Non-critical issues") +logger.error("Errors that need attention") +``` + +### 3. Resource Management + +Clean up in `cleanup()`: + +```python +async def initialize(self): + self.client = ExternalClient() + await self.client.connect() + +async def cleanup(self): + if self.client: + await self.client.disconnect() +``` + +### 4. Configuration Validation + +Validate in `initialize()`: + +```python +async def initialize(self): + if not self.config.get('api_key'): + raise ValueError("API key is required") + + if self.config.get('timeout', 0) <= 0: + raise ValueError("Timeout must be positive") +``` + +### 5. Async Best Practices + +Use `asyncio.to_thread()` for blocking operations: + +```python +import asyncio + +async def my_method(self): + # Run blocking operation in thread pool + result = await asyncio.to_thread(blocking_function, arg1, arg2) + return result +``` + +### 6. Database Access + +Use the global database handle: + +```python +from advanced_omi_backend.database import get_database + +async def save_data(self, data): + db = get_database() + await db['my_collection'].insert_one(data) +``` + +### 7. LLM Access + +Use the global LLM client: + +```python +from advanced_omi_backend.llm_client import async_generate + +async def generate_summary(self, text): + prompt = f"Summarize: {text}" + summary = await async_generate(prompt) + return summary +``` + +## Examples + +### Example 1: Slack Notifier + +```python +class SlackNotifierPlugin(BasePlugin): + SUPPORTED_ACCESS_LEVELS = ['conversation'] + + async def initialize(self): + self.webhook_url = self.config.get('slack_webhook_url') + if not self.webhook_url: + raise ValueError("Slack webhook URL required") + + async def on_conversation_complete(self, context): + transcript = context.data.get('transcript', '') + duration = context.data.get('duration', 0) + + message = { + "text": f"New conversation ({duration:.1f}s)", + "blocks": [{ + "type": "section", + "text": {"type": "mrkdwn", "text": f"```{transcript[:500]}```"} + }] + } + + async with aiohttp.ClientSession() as session: + await session.post(self.webhook_url, json=message) + + return PluginResult(success=True, message="Notification sent") +``` + +### Example 2: Keyword Alerter + +```python +class KeywordAlerterPlugin(BasePlugin): + SUPPORTED_ACCESS_LEVELS = ['transcript'] + + async def on_transcript(self, context): + transcript = context.data.get('transcript', '') + keywords = self.config.get('keywords', []) + + for keyword in keywords: + if keyword.lower() in transcript.lower(): + await self.send_alert(keyword, transcript) + return PluginResult( + success=True, + message=f"Alert sent for keyword: {keyword}" + ) + + return PluginResult(success=True) +``` + +### Example 3: Analytics Tracker + +```python +class AnalyticsTrackerPlugin(BasePlugin): + SUPPORTED_ACCESS_LEVELS = ['conversation', 'memory'] + + async def on_conversation_complete(self, context): + duration = context.data.get('duration', 0) + word_count = len(context.data.get('transcript', '').split()) + + await self.track_event('conversation_complete', { + 'user_id': context.user_id, + 'duration': duration, + 'word_count': word_count, + }) + + return PluginResult(success=True) + + async def on_memory_processed(self, context): + memory_count = context.data.get('memory_count', 0) + + await self.track_event('memory_processed', { + 'user_id': context.user_id, + 'memory_count': memory_count, + }) + + return PluginResult(success=True) +``` + +## Troubleshooting + +### Plugin Not Loading + +**Check logs**: +```bash +docker compose logs chronicle-backend | grep "plugin" +``` + +**Common issues**: +- Plugin directory name doesn't match class name convention +- Missing `__init__.py` or incorrect exports +- Syntax errors in plugin.py +- Not inheriting from `BasePlugin` + +**Solution**: +1. Verify directory structure matches: `plugins/my_plugin/` +2. Class name should be: `MyPluginPlugin` +3. Export in `__init__.py`: `from .plugin import MyPluginPlugin` + +### Plugin Enabled But Not Executing + +**Check**: +- Plugin enabled in `plugins.yml` +- Correct events subscribed +- Condition matches (wake_word, regex, etc.) + +**Debug**: +```python +async def on_conversation_complete(self, context): + logger.info(f"Plugin executed! Context: {context}") + # Your logic +``` + +### Configuration Errors + +**Error**: `Environment variable not found` + +**Solution**: +- Add variable to `.env` file +- Use default values: `${VAR:-default}` +- Check variable name spelling + +### Import Errors + +**Error**: `ModuleNotFoundError` + +**Solution**: +- Restart backend after adding dependencies +- Verify imports are from correct modules +- Check relative imports use `..base` for base classes + +### Database Connection Issues + +**Error**: `Database connection failed` + +**Solution**: +```python +from advanced_omi_backend.database import get_database + +async def my_method(self): + db = get_database() # Global database handle + # Use db... +``` + +## Advanced Topics + +### Custom Conditions + +Implement custom condition checking: + +```python +async def on_conversation_complete(self, context): + # Custom condition check + if not self._should_execute(context): + return PluginResult(success=True, message="Skipped") + + # Your logic + ... + +def _should_execute(self, context): + # Custom logic + duration = context.data.get('duration', 0) + return duration > 60 # Only process long conversations +``` + +### Plugin Dependencies + +Share data between plugins using context metadata: + +```python +# Plugin A +async def on_conversation_complete(self, context): + context.metadata['extracted_keywords'] = ['important', 'urgent'] + return PluginResult(success=True) + +# Plugin B (executes after Plugin A) +async def on_conversation_complete(self, context): + keywords = context.metadata.get('extracted_keywords', []) + # Use keywords... +``` + +### External Service Integration + +```python +import aiohttp + +class ExternalServicePlugin(BasePlugin): + async def initialize(self): + self.session = aiohttp.ClientSession() + self.api_url = self.config.get('api_url') + self.api_key = self.config.get('api_key') + + async def cleanup(self): + await self.session.close() + + async def on_conversation_complete(self, context): + async with self.session.post( + self.api_url, + headers={'Authorization': f'Bearer {self.api_key}'}, + json={'transcript': context.data.get('transcript')} + ) as response: + result = await response.json() + return PluginResult(success=True, data=result) +``` + +## Resources + +- **Base Plugin Class**: `backends/advanced/src/advanced_omi_backend/plugins/base.py` +- **Example Plugins**: + - Email Summarizer: `plugins/email_summarizer/` + - Home Assistant: `plugins/homeassistant/` + - Test Event: `plugins/test_event/` +- **Plugin Generator**: `scripts/create_plugin.py` +- **Configuration**: `config/plugins.yml.template` + +## Contributing Plugins + +Want to share your plugin with the community? + +1. Create a well-documented plugin +2. Add comprehensive README +3. Include configuration examples +4. Test thoroughly +5. Submit PR to Chronicle repository + +## Support + +- **GitHub Issues**: [chronicle-ai/chronicle/issues](https://github.com/chronicle-ai/chronicle/issues) +- **Discussions**: [chronicle-ai/chronicle/discussions](https://github.com/chronicle-ai/chronicle/discussions) +- **Documentation**: [Chronicle Docs](https://github.com/chronicle-ai/chronicle) + +Happy plugin development! 🚀 diff --git a/backends/advanced/init.py b/backends/advanced/init.py index e566cc72..7aa4f6aa 100644 --- a/backends/advanced/init.py +++ b/backends/advanced/init.py @@ -5,7 +5,6 @@ """ import argparse -import getpass import os import platform import secrets @@ -22,9 +21,15 @@ from rich.prompt import Confirm, Prompt from rich.text import Text -# Add repo root to path for config_manager import +# Add repo root to path for imports sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent)) from config_manager import ConfigManager +from setup_utils import ( + prompt_password as util_prompt_password, + prompt_with_existing_masked, + mask_value, + read_env_value +) class ChronicleSetup: @@ -79,19 +84,8 @@ def prompt_value(self, prompt: str, default: str = "") -> str: return default def prompt_password(self, prompt: str) -> str: - """Prompt for password (hidden input)""" - while True: - try: - password = getpass.getpass(f"{prompt}: ") - if len(password) >= 8: - return password - self.console.print("[yellow][WARNING][/yellow] Password must be at least 8 characters") - except (EOFError, KeyboardInterrupt): - # For non-interactive environments, generate a secure password - self.console.print("[yellow][WARNING][/yellow] Non-interactive environment detected") - password = f"admin-{secrets.token_hex(8)}" - self.console.print(f"Generated secure password: {password}") - return password + """Prompt for password (delegates to shared utility)""" + return util_prompt_password(prompt, min_length=8, allow_generated=True) def prompt_choice(self, prompt: str, choices: Dict[str, str], default: str = "1") -> str: """Prompt for a choice from options""" @@ -140,29 +134,18 @@ def backup_existing_env(self): self.console.print(f"[blue][INFO][/blue] Backed up existing .env file to {backup_path}") def read_existing_env_value(self, key: str) -> str: - """Read a value from existing .env file""" - env_path = Path(".env") - if not env_path.exists(): - return None - - value = get_key(str(env_path), key) - # get_key returns None if key doesn't exist or value is empty - return value if value else None + """Read a value from existing .env file (delegates to shared utility)""" + return read_env_value(".env", key) def mask_api_key(self, key: str, show_chars: int = 5) -> str: - """Mask API key showing only first and last few characters""" - if not key or len(key) <= show_chars * 2: - return key - - # Remove quotes if present - key_clean = key.strip("'\"") - - return f"{key_clean[:show_chars]}{'*' * min(15, len(key_clean) - show_chars * 2)}{key_clean[-show_chars:]}" + """Mask API key (delegates to shared utility)""" + return mask_value(key, show_chars) def prompt_with_existing_masked(self, prompt_text: str, env_key: str, placeholders: list, is_password: bool = False, default: str = "") -> str: """ Prompt for a value, showing masked existing value from .env if present. + Delegates to shared utility from setup_utils. Args: prompt_text: The prompt to display @@ -174,25 +157,15 @@ def prompt_with_existing_masked(self, prompt_text: str, env_key: str, placeholde Returns: User input value, existing value if reused, or default """ - existing_value = self.read_existing_env_value(env_key) - - # Check if existing value is valid (not empty and not a placeholder) - has_valid_existing = existing_value and existing_value not in placeholders - - if has_valid_existing: - # Show masked value with option to reuse - if is_password: - masked = self.mask_api_key(existing_value) - display_prompt = f"{prompt_text} ({masked}) [press Enter to reuse, or enter new]" - else: - display_prompt = f"{prompt_text} ({existing_value}) [press Enter to reuse, or enter new]" - - user_input = self.prompt_value(display_prompt, "") - # If user pressed Enter (empty input), reuse existing value - return user_input if user_input else existing_value - else: - # No existing value, prompt normally - return self.prompt_value(prompt_text, default) + # Use shared utility with auto-read from .env + return prompt_with_existing_masked( + prompt_text=prompt_text, + env_file_path=".env", + env_key=env_key, + placeholders=placeholders, + is_password=is_password, + default=default + ) def setup_authentication(self): @@ -474,6 +447,16 @@ def setup_obsidian(self): self.console.print("[green][SUCCESS][/green] Obsidian/Neo4j configured") self.console.print("[blue][INFO][/blue] Neo4j will start automatically with --profile obsidian") + else: + # Explicitly disable Obsidian in config.yml when not enabled + self.config_manager.update_memory_config({ + "obsidian": { + "enabled": False, + "neo4j_host": "neo4j-mem0", + "timeout": 30 + } + }) + self.console.print("[blue][INFO][/blue] Obsidian/Neo4j integration disabled") def setup_network(self): """Configure network settings""" diff --git a/backends/advanced/scripts/create_plugin.py b/backends/advanced/scripts/create_plugin.py new file mode 100755 index 00000000..a38a3570 --- /dev/null +++ b/backends/advanced/scripts/create_plugin.py @@ -0,0 +1,437 @@ +#!/usr/bin/env python3 +""" +Plugin Generator Script for Chronicle. + +Creates boilerplate plugin structure with templates and examples. + +Usage: + uv run python scripts/create_plugin.py my_awesome_plugin +""" +import argparse +import os +import shutil +import sys +from pathlib import Path + + +def snake_to_pascal(snake_str: str) -> str: + """Convert snake_case to PascalCase.""" + return ''.join(word.capitalize() for word in snake_str.split('_')) + + +def create_plugin(plugin_name: str, force: bool = False): + """ + Create a new plugin with boilerplate structure. + + Args: + plugin_name: Plugin name in snake_case (e.g., my_awesome_plugin) + force: Overwrite existing plugin if True + """ + # Validate plugin name + if not plugin_name.replace('_', '').isalnum(): + print(f"❌ Error: Plugin name must be alphanumeric with underscores") + print(f" Got: {plugin_name}") + print(f" Example: my_awesome_plugin") + sys.exit(1) + + # Convert to class name + class_name = snake_to_pascal(plugin_name) + 'Plugin' + + # Get plugins directory + script_dir = Path(__file__).parent + backend_dir = script_dir.parent + plugins_dir = backend_dir / 'src' / 'advanced_omi_backend' / 'plugins' + plugin_dir = plugins_dir / plugin_name + + # Check if plugin already exists + if plugin_dir.exists(): + if not force: + print(f"❌ Error: Plugin '{plugin_name}' already exists at {plugin_dir}") + print(f" Use --force to overwrite") + sys.exit(1) + else: + # Remove existing directory when using --force + print(f"🗑️ Removing existing plugin directory: {plugin_dir}") + shutil.rmtree(plugin_dir) + + # Create plugin directory + print(f"📁 Creating plugin directory: {plugin_dir}") + plugin_dir.mkdir(parents=True, exist_ok=True) + + # Create __init__.py + init_content = f'''""" +{class_name} for Chronicle. + +[Brief description of what your plugin does] +""" + +from .plugin import {class_name} + +__all__ = ['{class_name}'] +''' + + init_file = plugin_dir / '__init__.py' + print(f"📝 Creating {init_file}") + init_file.write_text(init_content) + + # Create plugin.py with template + plugin_content = f'''""" +{class_name} implementation. + +This plugin [describe what it does]. +""" +import logging +from typing import Any, Dict, List, Optional + +from ..base import BasePlugin, PluginContext, PluginResult + +logger = logging.getLogger(__name__) + + +class {class_name}(BasePlugin): + """ + [Plugin description] + + Subscribes to: [list events you want to subscribe to] + - transcript.streaming: Real-time transcript segments + - conversation.complete: When conversation finishes + - memory.processed: After memory extraction + + Configuration (config/plugins.yml): + {plugin_name}: + enabled: true + events: + - conversation.complete # Change to your event + condition: + type: always # or wake_word, regex, etc. + # Your custom config here: + my_setting: ${{MY_ENV_VAR}} + """ + + # Declare which access levels this plugin supports + # Options: 'transcript', 'conversation', 'memory' + SUPPORTED_ACCESS_LEVELS: List[str] = ['conversation'] + + def __init__(self, config: Dict[str, Any]): + """ + Initialize plugin with configuration. + + Args: + config: Plugin configuration from config/plugins.yml + """ + super().__init__(config) + + # Load your custom configuration + self.my_setting = config.get('my_setting', 'default_value') + + logger.info(f"{class_name} configuration loaded") + + async def initialize(self): + """ + Initialize plugin resources. + + Called during application startup. + Use this to: + - Connect to external services + - Initialize clients + - Validate configuration + - Set up resources + + Raises: + Exception: If initialization fails + """ + if not self.enabled: + logger.info(f"{class_name} is disabled, skipping initialization") + return + + logger.info(f"Initializing {class_name}...") + + # TODO: Add your initialization code here + # Example: + # self.client = SomeClient(self.my_setting) + # await self.client.connect() + + logger.info(f"✅ {class_name} initialized successfully") + + async def cleanup(self): + """ + Clean up plugin resources. + + Called during application shutdown. + Use this to: + - Close connections + - Save state + - Release resources + """ + logger.info(f"{class_name} cleanup complete") + + # Implement the methods for events you subscribed to: + + async def on_transcript(self, context: PluginContext) -> Optional[PluginResult]: + """ + Handle transcript.streaming events. + + Context data contains: + - transcript: str - The transcript text + - segment_id: str - Unique segment identifier + - conversation_id: str - Current conversation ID + + For wake_word conditions, router adds: + - command: str - Command with wake word stripped + - original_transcript: str - Full transcript + + Args: + context: Plugin context with transcript data + + Returns: + PluginResult with success status and optional message + """ + # TODO: Implement if you subscribed to transcript.streaming + pass + + async def on_conversation_complete(self, context: PluginContext) -> Optional[PluginResult]: + """ + Handle conversation.complete events. + + Context data contains: + - conversation: dict - Full conversation data + - transcript: str - Complete transcript + - duration: float - Conversation duration + - conversation_id: str - Conversation identifier + + Args: + context: Plugin context with conversation data + + Returns: + PluginResult with success status and optional message + """ + try: + logger.info(f"Processing conversation complete event for user: {{context.user_id}}") + + # Extract data from context + conversation = context.data.get('conversation', {{}}) + transcript = context.data.get('transcript', '') + duration = context.data.get('duration', 0) + conversation_id = context.data.get('conversation_id', 'unknown') + + # TODO: Add your plugin logic here + # Example: + # - Process the transcript + # - Call external APIs + # - Store data + # - Trigger actions + + logger.info(f"Processed conversation {{conversation_id}}") + + return PluginResult( + success=True, + message="Processing complete", + data={{'conversation_id': conversation_id}} + ) + + except Exception as e: + logger.error(f"Error in {class_name}: {{e}}", exc_info=True) + return PluginResult( + success=False, + message=f"Error: {{str(e)}}" + ) + + async def on_memory_processed(self, context: PluginContext) -> Optional[PluginResult]: + """ + Handle memory.processed events. + + Context data contains: + - memories: list - Extracted memories + - conversation: dict - Source conversation + - memory_count: int - Number of memories created + - conversation_id: str - Conversation identifier + + Args: + context: Plugin context with memory data + + Returns: + PluginResult with success status and optional message + """ + # TODO: Implement if you subscribed to memory.processed + pass + + # Add your custom helper methods here: + + async def _my_helper_method(self, data: Any) -> Any: + """ + Example helper method. + + Args: + data: Input data + + Returns: + Processed data + """ + # TODO: Implement your helper logic + pass +''' + + plugin_file = plugin_dir / 'plugin.py' + print(f"📝 Creating {plugin_file}") + plugin_file.write_text(plugin_content) + + # Create README.md + readme_content = f'''# {class_name} + +[Brief description of what your plugin does] + +## Features + +- Feature 1 +- Feature 2 +- Feature 3 + +## Configuration + +### Step 1: Environment Variables + +Add to `backends/advanced/.env`: + +```bash +# {class_name} Configuration +MY_ENV_VAR=your-value-here +``` + +### Step 2: Plugin Configuration + +Add to `config/plugins.yml`: + +```yaml +plugins: + {plugin_name}: + enabled: true + events: + - conversation.complete # Change to your event + condition: + type: always + + # Your custom configuration + my_setting: ${{MY_ENV_VAR}} +``` + +### Step 3: Restart Backend + +```bash +cd backends/advanced +docker compose restart +``` + +## How It Works + +1. [Step 1 description] +2. [Step 2 description] +3. [Step 3 description] + +## Configuration Options + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `my_setting` | string | `default` | Description of setting | + +## Testing + +```bash +# Add testing instructions here +``` + +## Troubleshooting + +### Issue 1 + +Solution 1 + +### Issue 2 + +Solution 2 + +## Development + +### File Structure + +``` +plugins/{plugin_name}/ +├── __init__.py # Plugin exports +├── plugin.py # Main plugin logic +└── README.md # This file +``` + +## License + +MIT License - see project LICENSE file for details. +''' + + readme_file = plugin_dir / 'README.md' + print(f"📝 Creating {readme_file}") + readme_file.write_text(readme_content) + + # Print success message and next steps + print(f"\n✅ Plugin '{plugin_name}' created successfully!\n") + print(f"📁 Location: {plugin_dir}\n") + print(f"📋 Next steps:") + print(f" 1. Edit {plugin_file}") + print(f" - Implement your plugin logic") + print(f" - Choose which events to subscribe to") + print(f" - Add your configuration options") + print(f"") + print(f" 2. Update config/plugins.yml:") + print(f" ```yaml") + print(f" plugins:") + print(f" {plugin_name}:") + print(f" enabled: true") + print(f" events:") + print(f" - conversation.complete") + print(f" condition:") + print(f" type: always") + print(f" ```") + print(f"") + print(f" 3. Add environment variables to .env (if needed)") + print(f"") + print(f" 4. Restart backend:") + print(f" cd backends/advanced && docker compose restart") + print(f"") + print(f"📖 Resources:") + print(f" - Plugin development guide: docs/plugin-development-guide.md") + print(f" - Example plugin: plugins/email_summarizer/") + print(f" - Base plugin class: plugins/base.py") + + +def main(): + parser = argparse.ArgumentParser( + description='Create a new Chronicle plugin with boilerplate structure', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=''' +Examples: + uv run python scripts/create_plugin.py my_awesome_plugin + uv run python scripts/create_plugin.py slack_notifier + uv run python scripts/create_plugin.py todo_extractor --force + ''' + ) + parser.add_argument( + 'plugin_name', + help='Plugin name in snake_case (e.g., my_awesome_plugin)' + ) + parser.add_argument( + '--force', '-f', + action='store_true', + help='Overwrite existing plugin if it exists' + ) + + args = parser.parse_args() + + try: + create_plugin(args.plugin_name, force=args.force) + except KeyboardInterrupt: + print("\n\n❌ Plugin creation cancelled") + sys.exit(1) + except Exception as e: + print(f"\n❌ Error creating plugin: {e}") + sys.exit(1) + + +if __name__ == '__main__': + main() diff --git a/backends/advanced/src/advanced_omi_backend/auth.py b/backends/advanced/src/advanced_omi_backend/auth.py index f1b7909a..2e14b8b0 100644 --- a/backends/advanced/src/advanced_omi_backend/auth.py +++ b/backends/advanced/src/advanced_omi_backend/auth.py @@ -224,6 +224,9 @@ async def create_admin_user_if_needed(): existing_admin = await user_db.get_by_email(ADMIN_EMAIL) if existing_admin: + logger.debug(f"existing_admin.id = {existing_admin.id}, type = {type(existing_admin.id)}") + logger.debug(f"str(existing_admin.id) = {str(existing_admin.id)}") + logger.debug(f"existing_admin.user_id = {existing_admin.user_id}") logger.info( f"✅ Admin user already exists: {existing_admin.user_id} ({existing_admin.email})" ) diff --git a/backends/advanced/src/advanced_omi_backend/clients/audio_stream_client.py b/backends/advanced/src/advanced_omi_backend/clients/audio_stream_client.py index edddd914..07a401a4 100644 --- a/backends/advanced/src/advanced_omi_backend/clients/audio_stream_client.py +++ b/backends/advanced/src/advanced_omi_backend/clients/audio_stream_client.py @@ -303,9 +303,19 @@ async def stream_wav_file( async def close(self) -> None: """Close the WebSocket connection.""" if self.ws: - await self.ws.close() - self.ws = None - logger.info("WebSocket connection closed") + try: + # Add timeout to WebSocket close to prevent hanging + await asyncio.wait_for(self.ws.close(), timeout=2.0) + logger.info("WebSocket connection closed cleanly") + except asyncio.TimeoutError: + logger.warning("WebSocket close timed out after 2s, forcing close") + # Force close without waiting for handshake + if hasattr(self.ws, 'transport') and self.ws.transport: + self.ws.transport.close() + except Exception as e: + logger.error(f"Error during WebSocket close: {e}") + finally: + self.ws = None async def __aenter__(self) -> "AudioStreamClient": """Async context manager entry.""" diff --git a/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py index 29d303b6..041bd06b 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py @@ -10,20 +10,17 @@ import logging import time import uuid -from pathlib import Path from fastapi import UploadFile from fastapi.responses import JSONResponse +from advanced_omi_backend.models.conversation import create_conversation +from advanced_omi_backend.models.user import User +from advanced_omi_backend.utils.audio_chunk_utils import convert_audio_to_chunks from advanced_omi_backend.utils.audio_utils import ( AudioValidationError, validate_and_prepare_audio, ) -from advanced_omi_backend.utils.audio_chunk_utils import convert_audio_to_chunks -from advanced_omi_backend.models.job import JobPriority -from advanced_omi_backend.models.user import User -from advanced_omi_backend.models.conversation import create_conversation -from advanced_omi_backend.models.conversation import Conversation logger = logging.getLogger(__name__) audio_logger = logging.getLogger("audio_processing") @@ -31,6 +28,7 @@ def generate_client_id(user: User, device_name: str) -> str: """Generate client ID for uploaded files.""" + logger.debug(f"Generating client ID - user.id={user.id}, type={type(user.id)}") user_id_suffix = str(user.id)[-6:] return f"{user_id_suffix}-{device_name}" @@ -39,8 +37,6 @@ async def upload_and_process_audio_files( user: User, files: list[UploadFile], device_name: str = "upload", - auto_generate_client: bool = True, - folder: str = None, source: str = "upload" ) -> dict: """ @@ -55,8 +51,7 @@ async def upload_and_process_audio_files( user: Authenticated user files: List of uploaded audio files device_name: Device identifier - auto_generate_client: Whether to auto-generate client ID - folder: Optional subfolder for audio storage (e.g., 'fixtures') + source: Source of the upload (e.g., 'upload', 'gdrive') """ try: if not files: @@ -84,14 +79,14 @@ async def upload_and_process_audio_files( content = await file.read() - # Generate audio UUID and timestamp + # Track external source for deduplication (Google Drive, etc.) + external_source_id = None + external_source_type = None if source == "gdrive": - audio_uuid = getattr(file, "audio_uuid", None) - if not audio_uuid: - audio_logger.error(f"Missing audio_uuid for gdrive file: {file.filename}") - audio_uuid = str(uuid.uuid4()) - else: - audio_uuid = str(uuid.uuid4()) + external_source_id = getattr(file, "file_id", None) or getattr(file, "audio_uuid", None) + external_source_type = "gdrive" + if not external_source_id: + audio_logger.warning(f"Missing file_id for gdrive file: {file.filename}") timestamp = int(time.time() * 1000) # Validate and prepare audio (read format from WAV file) @@ -120,11 +115,12 @@ async def upload_and_process_audio_files( title = file.filename.rsplit('.', 1)[0][:50] if file.filename else "Uploaded Audio" conversation = create_conversation( - audio_uuid=audio_uuid, user_id=user.user_id, client_id=client_id, title=title, - summary="Processing uploaded audio file..." + summary="Processing uploaded audio file...", + external_source_id=external_source_id, + external_source_type=external_source_type, ) await conversation.insert() conversation_id = conversation.conversation_id # Get the auto-generated ID @@ -171,11 +167,13 @@ async def upload_and_process_audio_files( # Enqueue batch transcription job first (file uploads need transcription) from advanced_omi_backend.controllers.queue_controller import ( + JOB_RESULT_TTL, start_post_conversation_jobs, transcription_queue, - JOB_RESULT_TTL ) - from advanced_omi_backend.workers.transcription_jobs import transcribe_full_audio_job + from advanced_omi_backend.workers.transcription_jobs import ( + transcribe_full_audio_job, + ) version_id = str(uuid.uuid4()) transcribe_job_id = f"transcribe_{conversation_id[:12]}" @@ -183,14 +181,13 @@ async def upload_and_process_audio_files( transcription_job = transcription_queue.enqueue( transcribe_full_audio_job, conversation_id, - audio_uuid, version_id, "batch", # trigger job_timeout=1800, # 30 minutes result_ttl=JOB_RESULT_TTL, job_id=transcribe_job_id, description=f"Transcribe uploaded file {conversation_id[:8]}", - meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id, 'client_id': client_id} + meta={'conversation_id': conversation_id, 'client_id': client_id} ) audio_logger.info(f"📥 Enqueued transcription job {transcription_job.id} for uploaded file") @@ -198,7 +195,6 @@ async def upload_and_process_audio_files( # Enqueue post-conversation processing job chain (depends on transcription) job_ids = start_post_conversation_jobs( conversation_id=conversation_id, - audio_uuid=audio_uuid, user_id=user.user_id, transcript_version_id=version_id, # Pass the version_id from transcription job depends_on_job=transcription_job, # Wait for transcription to complete @@ -208,7 +204,6 @@ async def upload_and_process_audio_files( processed_files.append({ "filename": file.filename, "status": "processing", - "audio_uuid": audio_uuid, "conversation_id": conversation_id, "transcript_job_id": transcription_job.id, "speaker_job_id": job_ids['speaker_recognition'], diff --git a/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py index c815bae3..f8afaf9d 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py @@ -109,10 +109,8 @@ async def get_conversation(conversation_id: str, user: User): # Build response with explicit curated fields response = { "conversation_id": conversation.conversation_id, - "audio_uuid": conversation.audio_uuid, "user_id": conversation.user_id, "client_id": conversation.client_id, - "audio_path": conversation.audio_path, "audio_chunks_count": conversation.audio_chunks_count, "audio_total_duration": conversation.audio_total_duration, "audio_compression_ratio": conversation.audio_compression_ratio, @@ -175,10 +173,8 @@ async def get_conversations(user: User, include_deleted: bool = False): for conv in user_conversations: conversations.append({ "conversation_id": conv.conversation_id, - "audio_uuid": conv.audio_uuid, "user_id": conv.user_id, "client_id": conv.client_id, - "audio_path": conv.audio_path, "audio_chunks_count": conv.audio_chunks_count, "audio_total_duration": conv.audio_total_duration, "audio_compression_ratio": conv.audio_compression_ratio, @@ -248,7 +244,6 @@ async def _hard_delete_conversation(conversation: Conversation) -> JSONResponse: """Permanently delete conversation and chunks (admin only).""" conversation_id = conversation.conversation_id client_id = conversation.client_id - audio_uuid = conversation.audio_uuid # Delete conversation document await conversation.delete() @@ -268,8 +263,7 @@ async def _hard_delete_conversation(conversation: Conversation) -> JSONResponse: "message": f"Successfully permanently deleted conversation '{conversation_id}'", "deleted_chunks": deleted_chunks, "conversation_id": conversation_id, - "client_id": client_id, - "audio_uuid": audio_uuid + "client_id": client_id } ) @@ -411,8 +405,6 @@ async def reprocess_transcript(conversation_id: str, user: User): return JSONResponse(status_code=403, content={"error": "Access forbidden. You can only reprocess your own conversations."}) # Get audio_uuid from conversation - audio_uuid = conversation_model.audio_uuid - # Validate audio chunks exist in MongoDB chunks = await AudioChunkDocument.find( AudioChunkDocument.conversation_id == conversation_id @@ -439,14 +431,13 @@ async def reprocess_transcript(conversation_id: str, user: User): transcript_job = transcription_queue.enqueue( transcribe_full_audio_job, conversation_id, - audio_uuid, version_id, "reprocess", job_timeout=600, result_ttl=JOB_RESULT_TTL, job_id=f"reprocess_{conversation_id[:8]}", description=f"Transcribe audio for {conversation_id[:8]}", - meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id} + meta={'conversation_id': conversation_id} ) logger.info(f"📥 RQ: Enqueued transcription job {transcript_job.id}") @@ -468,7 +459,7 @@ async def reprocess_transcript(conversation_id: str, user: User): result_ttl=JOB_RESULT_TTL, job_id=f"speaker_{conversation_id[:8]}", description=f"Recognize speakers for {conversation_id[:8]}", - meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id} + meta={'conversation_id': conversation_id} ) speaker_dependency = speaker_job # Chain for next job logger.info(f"📥 RQ: Enqueued speaker recognition job {speaker_job.id} (depends on {transcript_job.id})") @@ -486,7 +477,7 @@ async def reprocess_transcript(conversation_id: str, user: User): result_ttl=JOB_RESULT_TTL, job_id=f"memory_{conversation_id[:8]}", description=f"Extract memories for {conversation_id[:8]}", - meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id} + meta={'conversation_id': conversation_id} ) if speaker_job: logger.info(f"📥 RQ: Enqueued memory job {memory_job.id} (depends on speaker job {speaker_job.id})") diff --git a/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py index f8710718..9cd374e0 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py @@ -211,15 +211,15 @@ def get_jobs( } -def all_jobs_complete_for_session(session_id: str) -> bool: +def all_jobs_complete_for_client(client_id: str) -> bool: """ - Check if all jobs associated with a session are in terminal states. + Check if all jobs associated with a client are in terminal states. - Only checks jobs with audio_uuid in job.meta (no backward compatibility). + Checks jobs with client_id in job.meta. Traverses dependency chains to include dependent jobs. Args: - session_id: The audio_uuid (session ID) to check jobs for + client_id: The client device identifier to check jobs for Returns: True if all jobs are complete (or no jobs found), False if any job is still processing @@ -248,7 +248,7 @@ def is_job_complete(job): return True - # Find all jobs for this session + # Find all jobs for this client all_queues = [transcription_queue, memory_queue, audio_queue, default_queue] for queue in all_queues: registries = [ @@ -266,8 +266,8 @@ def is_job_complete(job): try: job = Job.fetch(job_id, connection=redis_conn) - # Only check jobs with audio_uuid in meta - if job.meta and job.meta.get('audio_uuid') == session_id: + # Only check jobs with client_id in meta + if job.meta and job.meta.get('client_id') == client_id: if not is_job_complete(job): return False except Exception as e: @@ -289,7 +289,7 @@ def start_streaming_jobs( 2. Audio persistence job - writes audio chunks to WAV file (file rotation per conversation) Args: - session_id: Stream session ID (audio_uuid) + session_id: Stream session ID (equals client_id for streaming) user_id: User identifier client_id: Client identifier @@ -313,7 +313,7 @@ def start_streaming_jobs( failure_ttl=86400, # Cleanup failed jobs after 24h job_id=f"speech-detect_{session_id[:12]}", description=f"Listening for speech...", - meta={'audio_uuid': session_id, 'client_id': client_id, 'session_level': True} + meta={'client_id': client_id, 'session_level': True} ) # Log job enqueue with TTL information for debugging actual_ttl = redis_conn.ttl(f"rq:job:{speech_job.id}") @@ -346,7 +346,7 @@ def start_streaming_jobs( failure_ttl=86400, # Cleanup failed jobs after 24h job_id=f"audio-persist_{session_id[:12]}", description=f"Audio persistence for session {session_id[:12]}", - meta={'audio_uuid': session_id, 'session_level': True} # Mark as session-level job + meta={'client_id': client_id, 'session_level': True} # Mark as session-level job ) # Log job enqueue with TTL information for debugging actual_ttl = redis_conn.ttl(f"rq:job:{audio_job.id}") @@ -366,7 +366,6 @@ def start_streaming_jobs( def start_post_conversation_jobs( conversation_id: str, - audio_uuid: str, user_id: str, transcript_version_id: Optional[str] = None, depends_on_job = None, @@ -386,7 +385,6 @@ def start_post_conversation_jobs( Args: conversation_id: Conversation identifier - audio_uuid: Audio UUID for job tracking user_id: User identifier transcript_version_id: Transcript version ID (auto-generated if None) depends_on_job: Optional job dependency for first job (e.g., transcription for file uploads) @@ -402,7 +400,7 @@ def start_post_conversation_jobs( version_id = transcript_version_id or str(uuid.uuid4()) # Build job metadata (include client_id if provided for UI tracking) - job_meta = {'audio_uuid': audio_uuid, 'conversation_id': conversation_id} + job_meta = {'conversation_id': conversation_id} if client_id: job_meta['client_id'] = client_id @@ -416,7 +414,7 @@ def start_post_conversation_jobs( if speaker_enabled: speaker_job_id = f"speaker_{conversation_id[:12]}" - logger.info(f"🔍 DEBUG: Creating speaker job with job_id={speaker_job_id}, conversation_id={conversation_id[:12]}, audio_uuid={audio_uuid[:12]}") + logger.info(f"🔍 DEBUG: Creating speaker job with job_id={speaker_job_id}, conversation_id={conversation_id[:12]}") speaker_job = transcription_queue.enqueue( recognise_speakers_job, @@ -440,7 +438,7 @@ def start_post_conversation_jobs( # Step 2: Memory extraction job # Depends on speaker job if it was created, otherwise depends on upstream (transcription or nothing) memory_job_id = f"memory_{conversation_id[:12]}" - logger.info(f"🔍 DEBUG: Creating memory job with job_id={memory_job_id}, conversation_id={conversation_id[:12]}, audio_uuid={audio_uuid[:12]}") + logger.info(f"🔍 DEBUG: Creating memory job with job_id={memory_job_id}, conversation_id={conversation_id[:12]}") memory_job = memory_queue.enqueue( process_memory_job, @@ -462,7 +460,7 @@ def start_post_conversation_jobs( # Step 3: Title/summary generation job # Depends on speaker job if enabled, otherwise on upstream dependency title_job_id = f"title_summary_{conversation_id[:12]}" - logger.info(f"🔍 DEBUG: Creating title/summary job with job_id={title_job_id}, conversation_id={conversation_id[:12]}, audio_uuid={audio_uuid[:12]}") + logger.info(f"🔍 DEBUG: Creating title/summary job with job_id={title_job_id}, conversation_id={conversation_id[:12]}") title_summary_job = default_queue.enqueue( generate_title_summary_job, @@ -484,14 +482,13 @@ def start_post_conversation_jobs( # Step 5: Dispatch conversation.complete event (runs after both memory and title/summary complete) # This ensures plugins receive the event after all processing is done event_job_id = f"event_complete_{conversation_id[:12]}" - logger.info(f"🔍 DEBUG: Creating conversation complete event job with job_id={event_job_id}, conversation_id={conversation_id[:12]}, audio_uuid={audio_uuid[:12]}") + logger.info(f"🔍 DEBUG: Creating conversation complete event job with job_id={event_job_id}, conversation_id={conversation_id[:12]}") # Event job depends on both memory and title/summary jobs completing # Use RQ's depends_on list to wait for both event_dispatch_job = default_queue.enqueue( dispatch_conversation_complete_event_job, conversation_id, - audio_uuid, client_id or "", user_id, job_timeout=120, # 2 minutes diff --git a/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py index 575e4dcb..bff60037 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py @@ -284,11 +284,13 @@ async def save_diarization_settings_controller(settings: dict): "min_duration_off", "min_speakers", "max_speakers" } + # Filter to only valid keys (allow round-trip GET→POST) + filtered_settings = {} for key, value in settings.items(): if key not in valid_keys: - raise HTTPException(status_code=400, detail=f"Invalid setting key: {key}") + continue # Skip unknown keys instead of rejecting - # Type validation + # Type validation for known keys only if key in ["min_speakers", "max_speakers"]: if not isinstance(value, int) or value < 1 or value > 20: raise HTTPException(status_code=400, detail=f"Invalid value for {key}: must be integer 1-20") @@ -299,13 +301,19 @@ async def save_diarization_settings_controller(settings: dict): if not isinstance(value, (int, float)) or value < 0: raise HTTPException(status_code=400, detail=f"Invalid value for {key}: must be positive number") + filtered_settings[key] = value + + # Reject if NO valid keys provided (completely invalid request) + if not filtered_settings: + raise HTTPException(status_code=400, detail="No valid diarization settings provided") + # Get current settings and merge with new values current_settings = load_diarization_settings() - current_settings.update(settings) + current_settings.update(filtered_settings) # Save using OmegaConf if save_diarization_settings(current_settings): - logger.info(f"Updated and saved diarization settings: {settings}") + logger.info(f"Updated and saved diarization settings: {filtered_settings}") return { "message": "Diarization settings saved successfully", diff --git a/backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py index 1f05e497..79bb56fc 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py @@ -838,8 +838,7 @@ async def _process_batch_audio_complete( f"📦 Batch mode: Combined {len(client_state.batch_audio_chunks)} chunks into {len(complete_audio)} bytes" ) - # Generate audio UUID and timestamp - audio_uuid = str(uuid.uuid4()) + # Timestamp for logging timestamp = int(time.time() * 1000) # Get audio format from batch metadata (set during audio-start) @@ -859,7 +858,6 @@ async def _process_batch_audio_complete( version_id = str(uuid.uuid4()) conversation = create_conversation( - audio_uuid=audio_uuid, user_id=user_id, client_id=client_id, title="Batch Recording", @@ -904,14 +902,13 @@ async def _process_batch_audio_complete( transcription_job = transcription_queue.enqueue( transcribe_full_audio_job, conversation_id, - audio_uuid, version_id, "batch", # trigger job_timeout=1800, # 30 minutes result_ttl=JOB_RESULT_TTL, job_id=transcribe_job_id, description=f"Transcribe batch audio {conversation_id[:8]}", - meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id, 'client_id': client_id} + meta={'conversation_id': conversation_id, 'client_id': client_id} ) application_logger.info(f"📥 Batch mode: Enqueued transcription job {transcription_job.id}") @@ -919,7 +916,6 @@ async def _process_batch_audio_complete( # Enqueue post-conversation processing job chain (depends on transcription) job_ids = start_post_conversation_jobs( conversation_id=conversation_id, - audio_uuid=audio_uuid, user_id=None, # Will be read from conversation in DB by jobs depends_on_job=transcription_job, # Wait for transcription to complete client_id=client_id # Pass client_id for UI tracking diff --git a/backends/advanced/src/advanced_omi_backend/main.py b/backends/advanced/src/advanced_omi_backend/main.py index 5160c230..ee60696f 100644 --- a/backends/advanced/src/advanced_omi_backend/main.py +++ b/backends/advanced/src/advanced_omi_backend/main.py @@ -16,6 +16,7 @@ """ import logging + import uvicorn from advanced_omi_backend.app_factory import create_app diff --git a/backends/advanced/src/advanced_omi_backend/models/conversation.py b/backends/advanced/src/advanced_omi_backend/models/conversation.py index 3d053536..28a2f0ec 100644 --- a/backends/advanced/src/advanced_omi_backend/models/conversation.py +++ b/backends/advanced/src/advanced_omi_backend/models/conversation.py @@ -12,6 +12,7 @@ import uuid from beanie import Document, Indexed +from pymongo import IndexModel class Conversation(Document): @@ -82,12 +83,18 @@ class MemoryVersion(BaseModel): # Core identifiers conversation_id: Indexed(str, unique=True) = Field(default_factory=lambda: str(uuid.uuid4()), description="Unique conversation identifier") - audio_uuid: Indexed(str) = Field(description="Session/audio identifier (for tracking audio files)") user_id: Indexed(str) = Field(description="User who owns this conversation") client_id: Indexed(str) = Field(description="Client device identifier") - # Legacy audio path field - no longer used, audio stored as MongoDB chunks - audio_path: Optional[str] = Field(None, description="Legacy field, not populated for new conversations") + # External file tracking (for deduplication of imported files) + external_source_id: Optional[str] = Field( + None, + description="External file identifier (e.g., Google Drive file_id) for deduplication" + ) + external_source_type: Optional[str] = Field( + None, + description="Type of external source (gdrive, dropbox, s3, etc.)" + ) # MongoDB chunk-based audio storage (new system) audio_chunks_count: Optional[int] = Field( @@ -324,13 +331,13 @@ class Settings: "conversation_id", "user_id", "created_at", - [("user_id", 1), ("created_at", -1)] # Compound index for user queries + [("user_id", 1), ("created_at", -1)], # Compound index for user queries + IndexModel([("external_source_id", 1)], sparse=True) # Sparse index for deduplication ] # Factory function for creating conversations def create_conversation( - audio_uuid: str, user_id: str, client_id: str, conversation_id: Optional[str] = None, @@ -338,12 +345,13 @@ def create_conversation( summary: Optional[str] = None, transcript: Optional[str] = None, segments: Optional[List["Conversation.SpeakerSegment"]] = None, + external_source_id: Optional[str] = None, + external_source_type: Optional[str] = None, ) -> Conversation: """ Factory function to create a new conversation. Args: - audio_uuid: Unique identifier for the audio session user_id: User who owns this conversation client_id: Client device identifier conversation_id: Optional unique conversation identifier (auto-generated if not provided) @@ -351,26 +359,25 @@ def create_conversation( summary: Optional conversation summary transcript: Optional transcript text segments: Optional speaker segments + external_source_id: Optional external file ID for deduplication (e.g., Google Drive file_id) + external_source_type: Optional external source type (gdrive, dropbox, etc.) Returns: Conversation instance """ # Build the conversation data conv_data = { - "audio_uuid": audio_uuid, "user_id": user_id, "client_id": client_id, "created_at": datetime.now(), "title": title, "summary": summary, - "transcript": transcript or "", - "segments": segments or [], "transcript_versions": [], "active_transcript_version": None, "memory_versions": [], "active_memory_version": None, - "memories": [], - "memory_count": 0 + "external_source_id": external_source_id, + "external_source_type": external_source_type, } # Only set conversation_id if provided, otherwise let the model auto-generate it diff --git a/backends/advanced/src/advanced_omi_backend/models/user.py b/backends/advanced/src/advanced_omi_backend/models/user.py index b0ced195..7291f9bb 100644 --- a/backends/advanced/src/advanced_omi_backend/models/user.py +++ b/backends/advanced/src/advanced_omi_backend/models/user.py @@ -16,6 +16,7 @@ class UserCreate(BaseUserCreate): """Schema for creating new users.""" display_name: Optional[str] = None + notification_email: Optional[EmailStr] = None is_superuser: Optional[bool] = False @@ -23,6 +24,7 @@ class UserRead(BaseUser[PydanticObjectId]): """Schema for reading user data.""" display_name: Optional[str] = None + notification_email: Optional[EmailStr] = None registered_clients: dict[str, dict] = Field(default_factory=dict) primary_speakers: list[dict] = Field(default_factory=list) @@ -31,6 +33,7 @@ class UserUpdate(BaseUserUpdate): """Schema for updating user data.""" display_name: Optional[str] = None + notification_email: Optional[EmailStr] = None is_superuser: Optional[bool] = None def create_update_dict(self): @@ -38,6 +41,8 @@ def create_update_dict(self): update_dict = super().create_update_dict() if self.display_name is not None: update_dict["display_name"] = self.display_name + if self.notification_email is not None: + update_dict["notification_email"] = self.notification_email return update_dict def create_update_dict_superuser(self): @@ -45,6 +50,8 @@ def create_update_dict_superuser(self): update_dict = super().create_update_dict_superuser() if self.display_name is not None: update_dict["display_name"] = self.display_name + if self.notification_email is not None: + update_dict["notification_email"] = self.notification_email return update_dict @@ -58,6 +65,7 @@ class User(BeanieBaseUser, Document): ) display_name: Optional[str] = None + notification_email: Optional[EmailStr] = None # Client tracking for audio devices registered_clients: dict[str, dict] = Field(default_factory=dict) # Speaker processing filter configuration diff --git a/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/README.md b/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/README.md new file mode 100644 index 00000000..f1a21a52 --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/README.md @@ -0,0 +1,276 @@ +# Email Summarizer Plugin + +Automatically sends email summaries when conversations complete. + +## Features + +- 📧 **Automatic Email Delivery**: Sends emails when conversations finish +- 🤖 **LLM-Powered Summaries**: Uses your configured LLM to generate intelligent summaries +- 🎨 **Beautiful HTML Emails**: Professional-looking emails with proper formatting +- 📱 **Plain Text Fallback**: Ensures compatibility with all email clients +- ⚡ **Async Processing**: Non-blocking email sending +- 🔒 **Secure SMTP**: TLS/SSL encryption support + +## How It Works + +1. User completes a conversation (via OMI device or file upload) +2. Plugin receives `conversation.complete` event +3. Retrieves user email from database +4. Generates LLM summary (2-3 sentences) +5. Formats beautiful HTML and plain text emails +6. Sends email via configured SMTP server + +## Configuration Architecture + +Chronicle uses a clean three-file separation for plugin configuration: + +1. **`backends/advanced/.env`** - Secrets only (SMTP credentials, API keys) + - Gitignored for security + - Never commit to version control + +2. **`plugins/email_summarizer/config.yml`** - Plugin-specific settings + - Email content options (subject prefix, max sentences, etc.) + - References environment variables using `${VAR_NAME}` syntax + - Defaults work for most users - typically no editing needed + +3. **`config/plugins.yml`** - Orchestration only + - `enabled` flag + - Event subscriptions + - Trigger conditions + +This separation keeps secrets secure and configuration organized. See [`plugin-configuration.md`](../../../Docs/plugin-configuration.md) for details. + +## Configuration + +### Step 1: Get SMTP Credentials + +#### For Gmail (Recommended for Testing): + +1. **Enable 2-Factor Authentication** on your Google account +2. Go to Google Account → Security → 2-Step Verification +3. Scroll down to **App passwords** +4. Generate an app password for "Mail" +5. Copy the 16-character password (no spaces) + +#### For Other Providers: + +- **Outlook/Hotmail**: smtp.office365.com:587 +- **Yahoo**: smtp.mail.yahoo.com:587 +- **Custom SMTP**: Use your provider's settings + +### Step 2: Configure Environment Variables + +Add to `backends/advanced/.env`: + +```bash +# Email Summarizer Plugin +SMTP_HOST=smtp.gmail.com +SMTP_PORT=587 +SMTP_USERNAME=your-email@gmail.com +SMTP_PASSWORD=your-app-password-here # Gmail App Password (16 chars, no spaces) +SMTP_USE_TLS=true +FROM_EMAIL=noreply@chronicle.ai +FROM_NAME=Chronicle AI +``` + +### Step 3: Enable Plugin + +Add to `config/plugins.yml` (orchestration only): + +```yaml +plugins: + email_summarizer: + enabled: true + events: + - conversation.complete + condition: + type: always +``` + +**That's it!** Plugin-specific settings are already configured in: +- **`plugins/email_summarizer/config.yml`** - Email content options (subject prefix, max sentences, etc.) +- **SMTP credentials** are automatically read from `.env` via environment variable references + +You typically don't need to edit `config.yml` - the defaults work for most users. If you want to customize email content settings, see the Configuration Options section below. + +### Step 4: Restart Backend + +```bash +cd backends/advanced +docker compose restart +``` + +## Configuration Options + +All configuration options below are in **`plugins/email_summarizer/config.yml`** and have sensible defaults. You typically don't need to modify these unless you want to customize email content. + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `smtp_host` | string | `smtp.gmail.com` | SMTP server hostname | +| `smtp_port` | integer | `587` | SMTP server port (587 for TLS, 465 for SSL) | +| `smtp_username` | string | **Required** | SMTP authentication username | +| `smtp_password` | string | **Required** | SMTP authentication password | +| `smtp_use_tls` | boolean | `true` | Use STARTTLS encryption | +| `from_email` | string | **Required** | Sender email address | +| `from_name` | string | `Chronicle AI` | Sender display name | +| `subject_prefix` | string | `Conversation Summary` | Email subject prefix | +| `summary_max_sentences` | integer | `3` | Maximum sentences in LLM summary | +| `include_conversation_id` | boolean | `true` | Show conversation ID in email | +| `include_duration` | boolean | `true` | Show conversation duration | + +## Email Template + +### Subject Line +``` +Conversation Summary - Jan 15, 2025 at 10:30 AM +``` + +### Email Body +``` +📋 SUMMARY +[LLM-generated 2-3 sentence summary of key points] + +📝 FULL TRANSCRIPT +[Complete conversation transcript] + +📊 METADATA +Duration: 5m 30s +Conversation ID: 507f1f77bc... +``` + +## Testing + +### Test SMTP Connection + +```bash +cd backends/advanced +uv run python -m advanced_omi_backend.services.email_service +``` + +This will: +- Test SMTP connectivity +- Send a test email to your SMTP username +- Verify configuration + +### Test Plugin Integration + +1. Start the backend with plugin enabled +2. Upload a test audio file or use OMI device +3. Wait for conversation to complete +4. Check your email inbox + +## Troubleshooting + +### "Authentication failed" + +**For Gmail:** +- Make sure you're using an **App Password**, not your regular password +- Enable 2-Factor Authentication first +- App password should be 16 characters (xxxx xxxx xxxx xxxx) + +**For other providers:** +- Verify username and password are correct +- Check if "less secure apps" needs to be enabled + +### "Connection timeout" + +- Check `smtp_host` and `smtp_port` are correct +- Verify firewall allows outbound SMTP connections +- Try port 465 with SSL instead of 587 with TLS + +### "No email received" + +- Check user has email configured in database +- Look for plugin logs: `docker compose logs -f chronicle-backend | grep EmailSummarizer` +- Verify plugin is enabled in `plugins.yml` +- Check spam/junk folder + +### "Empty summary" or "LLM error" + +- Verify LLM service is configured and running +- Check LLM API keys are valid +- Plugin will fall back to truncated transcript if LLM fails + +## 🔒 Security Best Practices + +### NEVER Commit Secrets to Version Control + +Always use environment variable references in configuration files: + +```yaml +# plugins/email_summarizer/config.yml +smtp_password: ${SMTP_PASSWORD} # Reference to environment variable +``` + +```bash +# backends/advanced/.env (gitignored) +SMTP_PASSWORD=xnetcqctkkfgzllh # Actual secret stored safely +``` + +### How Configuration Works + +The plugin system automatically: +- ✅ Loads settings from `plugins/email_summarizer/config.yml` +- ✅ Expands `${ENV_VAR}` references from `backends/advanced/.env` +- ✅ Merges orchestration settings (enabled, events) from `config/plugins.yml` +- ✅ Prevents accidental secret commits (only .env has secrets, and it's gitignored) + +**Always use the setup wizard** instead of manual configuration: +```bash +uv run python backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/setup.py +``` + +### Additional Security Tips + +1. **Never commit SMTP passwords** to git (use .env only) +2. **Use environment variable references** (`${SMTP_PASSWORD}`) in YAML files +3. **Enable TLS/SSL** for encrypted SMTP connections +4. **Gmail App Passwords** are safer than account passwords +5. **Rotate credentials** periodically +6. **Review commits** before pushing to ensure no hardcoded secrets + +## Development + +### File Structure + +``` +plugins/email_summarizer/ +├── __init__.py # Plugin exports +├── plugin.py # Main plugin logic +├── templates.py # Email HTML/text templates +└── README.md # This file +``` + +### Key Methods + +- `on_conversation_complete()` - Main event handler +- `_get_user_email()` - Fetch user email from database +- `_generate_summary()` - Generate LLM summary with fallback +- `_format_subject()` - Format email subject line + +### Dependencies + +- `advanced_omi_backend.database` - MongoDB access +- `advanced_omi_backend.llm_client` - LLM generation +- `advanced_omi_backend.services.email_service` - SMTP email sending + +## Future Enhancements + +- [ ] Email templates customization +- [ ] User preference for email frequency +- [ ] Unsubscribe link +- [ ] Email digests (daily/weekly summaries) +- [ ] Rich formatting for action items +- [ ] Attachment support (audio files) +- [ ] Multiple recipient support +- [ ] Email open tracking + +## Support + +- **Issues**: [GitHub Issues](https://github.com/chronicle-ai/chronicle/issues) +- **Discussions**: [GitHub Discussions](https://github.com/chronicle-ai/chronicle/discussions) +- **Documentation**: [Chronicle Docs](https://github.com/chronicle-ai/chronicle) + +## License + +MIT License - see project LICENSE file for details. diff --git a/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/__init__.py b/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/__init__.py new file mode 100644 index 00000000..525acd51 --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/__init__.py @@ -0,0 +1,9 @@ +""" +Email Summarizer Plugin for Chronicle. + +Automatically sends email summaries when conversations complete. +""" + +from .plugin import EmailSummarizerPlugin + +__all__ = ['EmailSummarizerPlugin'] diff --git a/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/config.yml b/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/config.yml new file mode 100644 index 00000000..9f4ed8f6 --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/config.yml @@ -0,0 +1,23 @@ +# Email Summarizer Plugin Configuration +# +# This file contains non-secret configuration for the email summarizer plugin. +# Secrets (SMTP credentials) are stored in backends/advanced/.env +# Plugin orchestration (enabled, events) is in config/plugins.yml + +# Email content settings +subject_prefix: "Conversation Summary" +summary_max_sentences: 3 +include_conversation_id: true +include_duration: true + +# SMTP Configuration (reads from .env) +# These use environment variable references ${VAR_NAME} +smtp_host: ${SMTP_HOST} +smtp_port: ${SMTP_PORT:-587} +smtp_username: ${SMTP_USERNAME} +smtp_password: ${SMTP_PASSWORD} +smtp_use_tls: ${SMTP_USE_TLS:-true} + +# Email sender configuration +from_email: ${FROM_EMAIL} +from_name: ${FROM_NAME:-Chronicle AI} diff --git a/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/email_service.py b/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/email_service.py new file mode 100644 index 00000000..be2d389e --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/email_service.py @@ -0,0 +1,219 @@ +""" +SMTP Email Service for Chronicle. + +Provides email sending functionality via SMTP protocol with support for: +- HTML and plain text emails +- TLS/SSL encryption +- Gmail and other SMTP providers +- Async implementation +""" +import asyncio +import logging +import smtplib +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from typing import Any, Dict, Optional + +logger = logging.getLogger(__name__) + + +class SMTPEmailService: + """SMTP email service for sending emails via SMTP protocol.""" + + def __init__(self, config: Dict[str, Any]): + """ + Initialize SMTP email service with configuration. + + Args: + config: SMTP configuration containing: + - smtp_host: SMTP server hostname + - smtp_port: SMTP server port (default: 587) + - smtp_username: SMTP username + - smtp_password: SMTP password + - smtp_use_tls: Whether to use TLS (default: True) + - from_email: Sender email address + - from_name: Sender display name (default: 'Chronicle AI') + """ + self.host = config.get('smtp_host') + self.port = config.get('smtp_port', 587) + self.username = config.get('smtp_username') + self.password = config.get('smtp_password') + self.use_tls = config.get('smtp_use_tls', True) + self.from_email = config.get('from_email') + self.from_name = config.get('from_name', 'Chronicle AI') + + # Validate required configuration + if not all([self.host, self.username, self.password, self.from_email]): + raise ValueError( + "SMTP configuration incomplete. Required: smtp_host, smtp_username, " + "smtp_password, from_email" + ) + + logger.info( + f"SMTP Email Service initialized: {self.username}@{self.host}:{self.port} " + f"(TLS: {self.use_tls})" + ) + + async def send_email( + self, + to_email: str, + subject: str, + body_text: str, + body_html: Optional[str] = None + ) -> bool: + """ + Send email via SMTP with HTML/text support. + + Args: + to_email: Recipient email address + subject: Email subject line + body_text: Plain text email body + body_html: Optional HTML email body + + Returns: + True if email sent successfully, False otherwise + """ + try: + # Create message container + msg = MIMEMultipart('alternative') + msg['Subject'] = subject + msg['From'] = f"{self.from_name} <{self.from_email}>" + msg['To'] = to_email + + # Attach plain text version + text_part = MIMEText(body_text, 'plain') + msg.attach(text_part) + + # Attach HTML version if provided + if body_html: + html_part = MIMEText(body_html, 'html') + msg.attach(html_part) + + # Send email asynchronously (run in thread pool to avoid blocking) + await asyncio.to_thread(self._send_smtp, msg, to_email) + + logger.info(f"✅ Email sent successfully to {to_email}: {subject}") + return True + + except Exception as e: + logger.error(f"Failed to send email to {to_email}: {e}", exc_info=True) + return False + + def _send_smtp(self, msg: MIMEMultipart, to_email: str) -> None: + """ + Internal method to send email via SMTP (blocking). + + Args: + msg: MIME message to send + to_email: Recipient email address + + Raises: + Exception: If SMTP sending fails + """ + # Connect to SMTP server + if self.use_tls: + # Use STARTTLS (most common for port 587) + smtp_server = smtplib.SMTP(self.host, self.port, timeout=30) + smtp_server.ehlo() + smtp_server.starttls() + smtp_server.ehlo() + else: + # Direct connection (for port 465 SSL or no encryption) + smtp_server = smtplib.SMTP(self.host, self.port, timeout=30) + + try: + # Login and send + smtp_server.login(self.username, self.password) + smtp_server.send_message(msg) + logger.debug(f"SMTP send completed for {to_email}") + finally: + smtp_server.quit() + + async def test_connection(self) -> bool: + """ + Test SMTP connectivity and authentication. + + Returns: + True if connection successful, False otherwise + """ + try: + await asyncio.to_thread(self._test_smtp_connection) + logger.info(f"✅ SMTP connection test successful: {self.username}@{self.host}") + return True + except Exception as e: + logger.error(f"SMTP connection test failed: {e}", exc_info=True) + return False + + def _test_smtp_connection(self) -> None: + """ + Internal method to test SMTP connection (blocking). + + Raises: + Exception: If connection fails + """ + if self.use_tls: + smtp_server = smtplib.SMTP(self.host, self.port, timeout=10) + smtp_server.ehlo() + smtp_server.starttls() + smtp_server.ehlo() + else: + smtp_server = smtplib.SMTP(self.host, self.port, timeout=10) + + try: + smtp_server.login(self.username, self.password) + logger.debug("SMTP authentication successful") + finally: + smtp_server.quit() + + +# Test script for development/debugging +async def main(): + """Test the SMTP email service.""" + import os + from dotenv import load_dotenv + + load_dotenv() + + config = { + 'smtp_host': os.getenv('SMTP_HOST', 'smtp.gmail.com'), + 'smtp_port': int(os.getenv('SMTP_PORT', 587)), + 'smtp_username': os.getenv('SMTP_USERNAME'), + 'smtp_password': os.getenv('SMTP_PASSWORD'), + 'smtp_use_tls': os.getenv('SMTP_USE_TLS', 'true').lower() == 'true', + 'from_email': os.getenv('FROM_EMAIL', 'noreply@chronicle.ai'), + 'from_name': os.getenv('FROM_NAME', 'Chronicle AI'), + } + + try: + service = SMTPEmailService(config) + + # Test connection + print("Testing SMTP connection...") + if await service.test_connection(): + print("✅ Connection test passed") + else: + print("❌ Connection test failed") + return + + # Send test email + test_email = config['smtp_username'] # Send to self + print(f"\nSending test email to {test_email}...") + + success = await service.send_email( + to_email=test_email, + subject="Chronicle Email Service Test", + body_text="This is a test email from Chronicle Email Service.\n\nIf you received this, the email service is working correctly!", + body_html="
This is a test email from Chronicle Email Service.
If you received this, the email service is working correctly!
" + ) + + if success: + print("✅ Test email sent successfully") + else: + print("❌ Failed to send test email") + + except Exception as e: + print(f"❌ Error: {e}") + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/plugin.py b/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/plugin.py new file mode 100644 index 00000000..02521d29 --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/plugin.py @@ -0,0 +1,288 @@ +""" +Email Summarizer Plugin for Chronicle. + +Automatically sends email summaries when conversations complete. +""" +import logging +from datetime import datetime +from typing import Any, Dict, List, Optional + +from advanced_omi_backend.database import get_database +from advanced_omi_backend.llm_client import async_generate + +from ..base import BasePlugin, PluginContext, PluginResult +from .email_service import SMTPEmailService +from .templates import format_html_email, format_text_email + +logger = logging.getLogger(__name__) + + +class EmailSummarizerPlugin(BasePlugin): + """ + Plugin for sending email summaries when conversations complete. + + Subscribes to conversation.complete events and: + 1. Retrieves user email from database + 2. Generates LLM summary of the conversation + 3. Formats HTML and plain text emails + 4. Sends email via SMTP + + Configuration (config/plugins.yml): + enabled: true + events: + - conversation.complete + condition: + type: always + smtp_host: smtp.gmail.com + smtp_port: 587 + smtp_username: ${SMTP_USERNAME} + smtp_password: ${SMTP_PASSWORD} + smtp_use_tls: true + from_email: noreply@chronicle.ai + from_name: Chronicle AI + subject_prefix: "Conversation Summary" + summary_max_sentences: 3 + """ + + SUPPORTED_ACCESS_LEVELS: List[str] = ['conversation'] + + def __init__(self, config: Dict[str, Any]): + """ + Initialize Email Summarizer plugin. + + Args: + config: Plugin configuration from config/plugins.yml + """ + super().__init__(config) + + self.subject_prefix = config.get('subject_prefix', 'Conversation Summary') + self.summary_max_sentences = config.get('summary_max_sentences', 3) + self.include_conversation_id = config.get('include_conversation_id', True) + self.include_duration = config.get('include_duration', True) + + # Email service will be initialized in initialize() + self.email_service: Optional[SMTPEmailService] = None + + # MongoDB database handle + self.db = None + + async def initialize(self): + """ + Initialize plugin resources. + + Sets up SMTP email service and MongoDB connection. + + Raises: + ValueError: If SMTP configuration is incomplete + Exception: If email service initialization fails + """ + if not self.enabled: + logger.info("Email Summarizer plugin is disabled, skipping initialization") + return + + logger.info("Initializing Email Summarizer plugin...") + + # Initialize SMTP email service + try: + smtp_config = { + 'smtp_host': self.config.get('smtp_host'), + 'smtp_port': self.config.get('smtp_port', 587), + 'smtp_username': self.config.get('smtp_username'), + 'smtp_password': self.config.get('smtp_password'), + 'smtp_use_tls': self.config.get('smtp_use_tls', True), + 'from_email': self.config.get('from_email'), + 'from_name': self.config.get('from_name', 'Chronicle AI'), + } + + self.email_service = SMTPEmailService(smtp_config) + + # Test SMTP connection + logger.info("Testing SMTP connectivity...") + if await self.email_service.test_connection(): + logger.info("✅ SMTP connection test successful") + else: + raise Exception("SMTP connection test failed") + + except Exception as e: + logger.error(f"Failed to initialize email service: {e}") + raise + + # Get MongoDB database handle + self.db = get_database() + logger.info("✅ Email Summarizer plugin initialized successfully") + + async def cleanup(self): + """Clean up plugin resources.""" + logger.info("Email Summarizer plugin cleanup complete") + + async def on_conversation_complete(self, context: PluginContext) -> Optional[PluginResult]: + """ + Send email summary when conversation completes. + + Args: + context: Plugin context with conversation data + - conversation: dict - Full conversation data + - transcript: str - Complete transcript + - duration: float - Conversation duration + - conversation_id: str - Conversation identifier + + Returns: + PluginResult with success status and message + """ + try: + logger.info(f"Processing conversation complete event for user: {context.user_id}") + + # Extract conversation data + conversation = context.data.get('conversation', {}) + transcript = context.data.get('transcript', '') + duration = context.data.get('duration', 0) + conversation_id = context.data.get('conversation_id', 'unknown') + created_at = conversation.get('created_at') + + # Validate transcript exists + if not transcript or transcript.strip() == '': + logger.warning(f"Empty transcript for conversation {conversation_id}, skipping email") + return PluginResult( + success=False, + message="Skipped: Empty transcript" + ) + + # Get user email from database + user_email = await self._get_user_email(context.user_id) + if not user_email: + logger.warning(f"No email found for user {context.user_id}, cannot send summary") + return PluginResult( + success=False, + message=f"No email configured for user {context.user_id}" + ) + + # Generate LLM summary + summary = await self._generate_summary(transcript) + + # Format email subject and body + subject = self._format_subject(created_at) + body_html = format_html_email( + summary=summary, + transcript=transcript, + conversation_id=conversation_id, + duration=duration, + created_at=created_at + ) + body_text = format_text_email( + summary=summary, + transcript=transcript, + conversation_id=conversation_id, + duration=duration, + created_at=created_at + ) + + # Send email + success = await self.email_service.send_email( + to_email=user_email, + subject=subject, + body_text=body_text, + body_html=body_html + ) + + if success: + logger.info(f"✅ Email summary sent to {user_email} for conversation {conversation_id}") + return PluginResult( + success=True, + message=f"Email sent to {user_email}", + data={'recipient': user_email, 'conversation_id': conversation_id} + ) + else: + logger.error(f"Failed to send email to {user_email}") + return PluginResult( + success=False, + message=f"Failed to send email to {user_email}" + ) + + except Exception as e: + logger.error(f"Error in email summarizer plugin: {e}", exc_info=True) + return PluginResult( + success=False, + message=f"Error: {str(e)}" + ) + + async def _get_user_email(self, user_id: str) -> Optional[str]: + """ + Get notification email from user. + + Args: + user_id: User identifier (MongoDB ObjectId) + + Returns: + User's notification_email, or None if not set + """ + try: + from bson import ObjectId + + # Query users collection + user = await self.db['users'].find_one({'_id': ObjectId(user_id)}) + + if not user: + logger.warning(f"User {user_id} not found") + return None + + notification_email = user.get('notification_email') + + if not notification_email: + logger.warning(f"User {user_id} has no notification_email set") + return None + + logger.debug(f"Sending notification to {notification_email} for user {user_id}") + return notification_email + + except Exception as e: + logger.error(f"Error fetching user email: {e}", exc_info=True) + return None + + async def _generate_summary(self, transcript: str) -> str: + """ + Generate LLM summary of the conversation. + + Args: + transcript: Full conversation transcript + + Returns: + Generated summary (2-3 sentences) + """ + try: + prompt = ( + f"Summarize this conversation in {self.summary_max_sentences} sentences or less. " + f"Focus on key points, main topics discussed, and any action items or decisions. " + f"Be concise and clear.\n\n" + f"Conversation:\n{transcript}" + ) + + logger.debug("Generating LLM summary...") + summary = await async_generate(prompt) + + if not summary or summary.strip() == '': + raise ValueError("LLM returned empty summary") + + logger.info("✅ LLM summary generated successfully") + return summary.strip() + + except Exception as e: + logger.error(f"Failed to generate LLM summary: {e}", exc_info=True) + # Fallback: return first 300 characters of transcript + logger.warning("Using fallback: truncated transcript") + return transcript[:300] + "..." if len(transcript) > 300 else transcript + + def _format_subject(self, created_at: Optional[datetime] = None) -> str: + """ + Format email subject line. + + Args: + created_at: Conversation creation timestamp + + Returns: + Formatted subject line + """ + if created_at: + date_str = created_at.strftime("%b %d, %Y at %I:%M %p") + return f"{self.subject_prefix} - {date_str}" + else: + return self.subject_prefix diff --git a/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/setup.py b/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/setup.py new file mode 100755 index 00000000..728ae607 --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/setup.py @@ -0,0 +1,200 @@ +#!/usr/bin/env python3 +""" +Email Summarizer Plugin Setup Wizard + +Configures SMTP credentials and plugin settings. +Follows Chronicle's clean configuration architecture: +- Secrets → backends/advanced/.env +- Non-secret settings → plugins/email_summarizer/config.yml +- Orchestration → config/plugins.yml +""" + +import shutil +import sys +from datetime import datetime +from pathlib import Path + +import yaml +from dotenv import set_key +from rich.console import Console +from rich.prompt import Confirm + +# Add repo root to path for setup_utils import +project_root = Path(__file__).resolve().parents[6] +sys.path.insert(0, str(project_root)) + +from setup_utils import ( + prompt_with_existing_masked, + prompt_value +) + +console = Console() + + +def update_plugins_yml_orchestration(): + """ + Update config/plugins.yml with orchestration settings only. + Plugin-specific settings are in plugins/email_summarizer/config.yml. + This follows Chronicle's three-file configuration architecture. + """ + plugins_yml_path = project_root / "config" / "plugins.yml" + + # Load existing or create from template + if plugins_yml_path.exists(): + with open(plugins_yml_path, 'r') as f: + config = yaml.safe_load(f) or {} + else: + # Copy from template + template_path = project_root / "config" / "plugins.yml.template" + if template_path.exists(): + with open(template_path, 'r') as f: + config = yaml.safe_load(f) or {} + else: + config = {'plugins': {}} + + # Ensure structure exists + if 'plugins' not in config: + config['plugins'] = {} + + # Only orchestration settings in config/plugins.yml + # Plugin-specific settings are in plugins/email_summarizer/config.yml + plugin_config = { + 'enabled': False, # Let user enable manually or prompt + 'events': ['conversation.complete'], + 'condition': {'type': 'always'} + } + + # Update or create plugin entry + config['plugins']['email_summarizer'] = plugin_config + + # Backup existing file + if plugins_yml_path.exists(): + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + backup_path = plugins_yml_path.parent / f"plugins.yml.backup.{timestamp}" + shutil.copy(plugins_yml_path, backup_path) + console.print(f"[dim]Backed up existing plugins.yml to {backup_path.name}[/dim]") + + # Write updated config + plugins_yml_path.parent.mkdir(parents=True, exist_ok=True) + with open(plugins_yml_path, 'w') as f: + yaml.dump(config, f, default_flow_style=False, sort_keys=False) + + console.print("[green]✅ Updated config/plugins.yml (orchestration only)[/green]") + + return plugins_yml_path + + +def main(): + """Interactive setup for Email Summarizer plugin""" + console.print("\n📧 [bold cyan]Email Summarizer Plugin Setup[/bold cyan]") + console.print("This plugin sends email summaries when conversations complete.\n") + + # Path to main backend .env file + env_path = str(project_root / "backends" / "advanced" / ".env") + + # SMTP Configuration + console.print("[bold]SMTP Configuration[/bold]") + console.print("[dim]For Gmail: Use App Password (Settings > Security > 2FA > App Passwords)[/dim]\n") + + smtp_host = prompt_with_existing_masked( + prompt_text="SMTP Host", + env_file_path=env_path, + env_key="SMTP_HOST", + placeholders=['your-smtp-host-here'], + is_password=False, + default="smtp.gmail.com" + ) + + smtp_port = prompt_value("SMTP Port", default="587") + + smtp_username = prompt_with_existing_masked( + prompt_text="SMTP Username (your email)", + env_file_path=env_path, + env_key="SMTP_USERNAME", + placeholders=['your-email@example.com'], + is_password=False + ) + + smtp_password = prompt_with_existing_masked( + prompt_text="SMTP Password (App Password)", + env_file_path=env_path, + env_key="SMTP_PASSWORD", + placeholders=['your-password-here', 'your-app-password-here'], + is_password=True # Shows masked existing value + ) + + # Remove spaces from app password (Google adds spaces when copying) + smtp_password = smtp_password.replace(" ", "") + + smtp_use_tls = prompt_value("Use TLS? (true/false)", default="true") + + # Email sender configuration + from_email = prompt_with_existing_masked( + prompt_text="From Email", + env_file_path=env_path, + env_key="FROM_EMAIL", + placeholders=['noreply@example.com'], + is_password=False, + default=smtp_username # Default to SMTP username + ) + + from_name = prompt_value("From Name", default="Chronicle AI") + + # Save secrets to .env + console.print("\n💾 [bold]Saving credentials to .env...[/bold]") + + set_key(env_path, "SMTP_HOST", smtp_host) + set_key(env_path, "SMTP_PORT", smtp_port) + set_key(env_path, "SMTP_USERNAME", smtp_username) + set_key(env_path, "SMTP_PASSWORD", smtp_password) + set_key(env_path, "SMTP_USE_TLS", smtp_use_tls) + set_key(env_path, "FROM_EMAIL", from_email) + set_key(env_path, "FROM_NAME", from_name) + + console.print("[green]✅ SMTP credentials saved to backends/advanced/.env[/green]") + + # Auto-update plugins.yml with orchestration settings only + console.print("\n📝 [bold]Updating plugin configuration...[/bold]") + plugins_yml_path = update_plugins_yml_orchestration() + + # Prompt to enable plugin + enable_now = Confirm.ask("\nEnable email_summarizer plugin now?", default=True) + if enable_now: + with open(plugins_yml_path, 'r') as f: + config = yaml.safe_load(f) + config['plugins']['email_summarizer']['enabled'] = True + with open(plugins_yml_path, 'w') as f: + yaml.dump(config, f, default_flow_style=False, sort_keys=False) + console.print("[green]✅ Plugin enabled in config/plugins.yml[/green]") + + console.print("\n[bold cyan]✅ Email Summarizer configured successfully![/bold cyan]") + console.print("\n[bold]Configuration saved to:[/bold]") + console.print(" • [green]backends/advanced/.env[/green] - SMTP credentials (secrets)") + console.print(" • [green]config/plugins.yml[/green] - Plugin orchestration (enabled, events)") + console.print(" • [green]plugins/email_summarizer/config.yml[/green] - Plugin settings (already configured)") + console.print() + + if not enable_now: + console.print("[bold]To enable later:[/bold]") + console.print(" Edit config/plugins.yml and set: enabled: true") + console.print() + + console.print("[bold]Restart backend to apply:[/bold]") + console.print(" [dim]cd backends/advanced && docker compose restart[/dim]") + console.print() + console.print("[yellow]⚠️ SECURITY: Never commit secrets to git![/yellow]") + console.print("[yellow] • Secrets go in backends/advanced/.env (gitignored)[/yellow]") + console.print("[yellow] • Config files use ${ENV_VAR} references only[/yellow]") + + +if __name__ == '__main__': + try: + main() + except KeyboardInterrupt: + console.print("\n[yellow]Setup cancelled by user[/yellow]") + sys.exit(1) + except Exception as e: + console.print(f"\n[red]Error during setup: {e}[/red]") + import traceback + traceback.print_exc() + sys.exit(1) diff --git a/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/templates.py b/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/templates.py new file mode 100644 index 00000000..9f99e5cb --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/templates.py @@ -0,0 +1,258 @@ +""" +Email templates for the Email Summarizer plugin. + +Provides HTML and plain text email templates. +""" +import html +from datetime import datetime +from typing import Optional + + +def format_duration(seconds: float) -> str: + """ + Format duration in seconds to human-readable format. + + Args: + seconds: Duration in seconds + + Returns: + Formatted duration (e.g., "5m 30s", "1h 15m") + """ + hours = int(seconds // 3600) + minutes = int((seconds % 3600) // 60) + secs = int(seconds % 60) + + if hours > 0: + return f"{hours}h {minutes}m" + elif minutes > 0: + return f"{minutes}m {secs}s" + else: + return f"{secs}s" + + +def format_html_email( + summary: str, + transcript: str, + conversation_id: str, + duration: float, + created_at: Optional[datetime] = None +) -> str: + """ + Format HTML email template. + + Args: + summary: LLM-generated summary + transcript: Full conversation transcript + conversation_id: Conversation identifier + duration: Conversation duration in seconds + created_at: Conversation creation timestamp + + Returns: + HTML email body + """ + formatted_duration = format_duration(duration) + date_str = created_at.strftime("%B %d, %Y at %I:%M %p") if created_at else "N/A" + + # Escape HTML to prevent XSS attacks + summary_escaped = html.escape(summary, quote=True) + transcript_escaped = html.escape(transcript, quote=True) + + # Format transcript with line breaks (after escaping) + transcript_html = transcript_escaped.replace('\n', '{summary_escaped}
+