📋 Summary
+{summary}
+From e2fe9c97811c930dbbfb8f9bade1bdb8cf3a2a33 Mon Sep 17 00:00:00 2001 From: Ankush Malaker <43288948+AnkushMalaker@users.noreply.github.com> Date: Sat, 17 Jan 2026 01:55:42 +0000 Subject: [PATCH 01/21] Add Email Summarizer Plugin and SMTP Email Service - Introduced the Email Summarizer Plugin that automatically sends email summaries upon conversation completion. - Implemented SMTP Email Service for sending emails, supporting HTML and plain text formats with TLS/SSL encryption. - Added configuration options for SMTP settings in the .env.template and plugins.yml.template. - Created comprehensive documentation for plugin development and usage, including a new plugin generation script. - Enhanced testing coverage for the Email Summarizer Plugin and SMTP Email Service to ensure reliability and functionality. --- backends/advanced/.env.template | 18 + .../advanced/docs/plugin-development-guide.md | 776 ++++++++++++++++++ backends/advanced/scripts/create_plugin.py | 431 ++++++++++ .../plugins/email_summarizer/README.md | 233 ++++++ .../plugins/email_summarizer/__init__.py | 9 + .../plugins/email_summarizer/email_service.py | 219 +++++ .../plugins/email_summarizer/plugin.py | 281 +++++++ .../plugins/email_summarizer/templates.py | 253 ++++++ .../services/plugin_service.py | 140 +++- backends/advanced/tests/test_email_service.py | 210 +++++ .../tests/test_email_summarizer_plugin.py | 284 +++++++ backends/advanced/uv.lock | 21 + config/plugins.yml.template | 42 +- 13 files changed, 2897 insertions(+), 20 deletions(-) create mode 100644 backends/advanced/docs/plugin-development-guide.md create mode 100755 backends/advanced/scripts/create_plugin.py create mode 100644 backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/README.md create mode 100644 backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/__init__.py create mode 100644 backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/email_service.py create mode 100644 backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/plugin.py create mode 100644 backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/templates.py create mode 100644 backends/advanced/tests/test_email_service.py create mode 100644 backends/advanced/tests/test_email_summarizer_plugin.py diff --git a/backends/advanced/.env.template b/backends/advanced/.env.template index 88617688..8f2a0dbf 100644 --- a/backends/advanced/.env.template +++ b/backends/advanced/.env.template @@ -60,3 +60,21 @@ TS_AUTHKEY= # Home Assistant long-lived access token (for voice control plugin) HA_TOKEN= + +# ======================================== +# Email Summarizer Plugin (Optional) +# ======================================== + +# SMTP Configuration for sending email summaries +# For Gmail: Use App Password (requires 2FA enabled) +# 1. Go to Google Account → Security → 2-Step Verification +# 2. Scroll to "App passwords" → Generate password for "Mail" +# 3. Use the 16-character password below (no spaces) + +SMTP_HOST=smtp.gmail.com +SMTP_PORT=587 +SMTP_USERNAME=your-email@gmail.com +SMTP_PASSWORD=your-app-password-here +SMTP_USE_TLS=true +FROM_EMAIL=noreply@chronicle.ai +FROM_NAME=Chronicle AI diff --git a/backends/advanced/docs/plugin-development-guide.md b/backends/advanced/docs/plugin-development-guide.md new file mode 100644 index 00000000..17c53b4a --- /dev/null +++ b/backends/advanced/docs/plugin-development-guide.md @@ -0,0 +1,776 @@ +# Chronicle Plugin Development Guide + +A comprehensive guide to creating custom plugins for Chronicle. + +## Table of Contents + +1. [Introduction](#introduction) +2. [Quick Start](#quick-start) +3. [Plugin Architecture](#plugin-architecture) +4. [Event Types](#event-types) +5. [Creating Your First Plugin](#creating-your-first-plugin) +6. [Configuration](#configuration) +7. [Testing Plugins](#testing-plugins) +8. [Best Practices](#best-practices) +9. [Examples](#examples) +10. [Troubleshooting](#troubleshooting) + +## Introduction + +Chronicle's plugin system allows you to extend functionality by subscribing to events and executing custom logic. Plugins are: + +- **Event-driven**: React to transcripts, conversations, or memory processing +- **Auto-discovered**: Drop plugins into the `plugins/` directory +- **Configurable**: YAML-based configuration with environment variable support +- **Isolated**: Each plugin runs independently with proper error handling + +### Plugin Types + +- **Core Plugins**: Built-in plugins (`homeassistant`, `test_event`) +- **Community Plugins**: Auto-discovered plugins in `plugins/` directory + +## Quick Start + +### 1. Generate Plugin Boilerplate + +```bash +cd backends/advanced +uv run python scripts/create_plugin.py my_awesome_plugin +``` + +This creates: +``` +plugins/my_awesome_plugin/ +├── __init__.py # Plugin exports +├── plugin.py # Main plugin logic +└── README.md # Plugin documentation +``` + +### 2. Implement Plugin Logic + +Edit `plugins/my_awesome_plugin/plugin.py`: + +```python +async def on_conversation_complete(self, context: PluginContext) -> Optional[PluginResult]: + """Handle conversation completion.""" + transcript = context.data.get('transcript', '') + + # Your custom logic here + print(f"Processing: {transcript}") + + return PluginResult(success=True, message="Processing complete") +``` + +### 3. Configure Plugin + +Add to `config/plugins.yml`: + +```yaml +plugins: + my_awesome_plugin: + enabled: true + events: + - conversation.complete + condition: + type: always +``` + +### 4. Restart Backend + +```bash +cd backends/advanced +docker compose restart +``` + +Your plugin will be auto-discovered and loaded! + +## Plugin Architecture + +### Base Plugin Class + +All plugins inherit from `BasePlugin`: + +```python +from advanced_omi_backend.plugins.base import BasePlugin, PluginContext, PluginResult + +class MyPlugin(BasePlugin): + SUPPORTED_ACCESS_LEVELS = ['conversation'] # Which events you support + + async def initialize(self): + """Initialize resources (called on app startup)""" + pass + + async def cleanup(self): + """Clean up resources (called on app shutdown)""" + pass + + async def on_conversation_complete(self, context: PluginContext): + """Handle conversation.complete events""" + pass +``` + +### Plugin Context + +Context passed to plugin methods: + +```python +@dataclass +class PluginContext: + user_id: str # User identifier + event: str # Event name (e.g., "conversation.complete") + data: Dict[str, Any] # Event-specific data + metadata: Dict[str, Any] # Additional metadata +``` + +### Plugin Result + +Return value from plugin methods: + +```python +@dataclass +class PluginResult: + success: bool # Whether operation succeeded + data: Optional[Dict[str, Any]] # Optional result data + message: Optional[str] # Optional status message + should_continue: bool # Whether to continue normal processing (default: True) +``` + +## Event Types + +### 1. Transcript Events (`transcript.streaming`) + +**When**: Real-time transcript segments arrive from WebSocket +**Context Data**: +- `transcript` (str): The transcript text +- `segment_id` (str): Unique segment identifier +- `conversation_id` (str): Current conversation ID + +**Use Cases**: +- Wake word detection +- Real-time command processing +- Live transcript analysis + +**Example**: +```python +async def on_transcript(self, context: PluginContext): + transcript = context.data.get('transcript', '') + if 'urgent' in transcript.lower(): + await self.send_notification(transcript) +``` + +### 2. Conversation Events (`conversation.complete`) + +**When**: Conversation processing finishes +**Context Data**: +- `conversation` (dict): Full conversation data +- `transcript` (str): Complete transcript +- `duration` (float): Conversation duration in seconds +- `conversation_id` (str): Conversation identifier + +**Use Cases**: +- Email summaries +- Analytics tracking +- External integrations +- Conversation archiving + +**Example**: +```python +async def on_conversation_complete(self, context: PluginContext): + conversation = context.data.get('conversation', {}) + duration = context.data.get('duration', 0) + + if duration > 300: # 5 minutes + await self.archive_long_conversation(conversation) +``` + +### 3. Memory Events (`memory.processed`) + +**When**: Memory extraction finishes +**Context Data**: +- `memories` (list): Extracted memories +- `conversation` (dict): Source conversation +- `memory_count` (int): Number of memories created +- `conversation_id` (str): Conversation identifier + +**Use Cases**: +- Memory indexing +- Knowledge graph updates +- Memory notifications +- Analytics + +**Example**: +```python +async def on_memory_processed(self, context: PluginContext): + memories = context.data.get('memories', []) + + for memory in memories: + await self.index_memory(memory) +``` + +## Creating Your First Plugin + +### Step 1: Generate Boilerplate + +```bash +uv run python scripts/create_plugin.py todo_extractor +``` + +### Step 2: Define Plugin Logic + +```python +""" +Todo Extractor Plugin - Extracts action items from conversations. +""" +import logging +import re +from typing import Any, Dict, List, Optional + +from ..base import BasePlugin, PluginContext, PluginResult + +logger = logging.getLogger(__name__) + + +class TodoExtractorPlugin(BasePlugin): + """Extract and save action items from conversations.""" + + SUPPORTED_ACCESS_LEVELS = ['conversation'] + + def __init__(self, config: Dict[str, Any]): + super().__init__(config) + self.todo_patterns = [ + r'I need to (.+)', + r'I should (.+)', + r'TODO: (.+)', + r'reminder to (.+)', + ] + + async def initialize(self): + if not self.enabled: + return + + logger.info("TodoExtractor plugin initialized") + + async def on_conversation_complete(self, context: PluginContext): + try: + transcript = context.data.get('transcript', '') + todos = self._extract_todos(transcript) + + if todos: + await self._save_todos(context.user_id, todos) + + return PluginResult( + success=True, + message=f"Extracted {len(todos)} action items", + data={'todos': todos} + ) + + return PluginResult(success=True, message="No action items found") + + except Exception as e: + logger.error(f"Error extracting todos: {e}") + return PluginResult(success=False, message=str(e)) + + def _extract_todos(self, transcript: str) -> List[str]: + """Extract todo items from transcript.""" + todos = [] + + for pattern in self.todo_patterns: + matches = re.findall(pattern, transcript, re.IGNORECASE) + todos.extend(matches) + + return list(set(todos)) # Remove duplicates + + async def _save_todos(self, user_id: str, todos: List[str]): + """Save todos to database or external service.""" + from advanced_omi_backend.database import get_database + + db = get_database() + for todo in todos: + await db['todos'].insert_one({ + 'user_id': user_id, + 'task': todo, + 'completed': False, + 'created_at': datetime.utcnow() + }) +``` + +### Step 3: Configure Plugin + +`config/plugins.yml`: + +```yaml +plugins: + todo_extractor: + enabled: true + events: + - conversation.complete + condition: + type: always +``` + +### Step 4: Test Plugin + +1. Restart backend: `docker compose restart` +2. Create a conversation with phrases like "I need to buy milk" +3. Check logs: `docker compose logs -f chronicle-backend | grep TodoExtractor` +4. Verify todos in database + +## Configuration + +### YAML Configuration + +`config/plugins.yml`: + +```yaml +plugins: + my_plugin: + # Basic Configuration + enabled: true # Enable/disable plugin + + # Event Subscriptions + events: + - conversation.complete + - memory.processed + + # Execution Conditions + condition: + type: always # always, wake_word, regex + # wake_words: ["hey assistant"] # For wake_word type + # pattern: "urgent" # For regex type + + # Custom Configuration + api_url: ${MY_API_URL} # Environment variable + timeout: 30 + max_retries: 3 +``` + +### Environment Variables + +Use `${VAR_NAME}` syntax: + +```yaml +api_key: ${MY_API_KEY} +base_url: ${BASE_URL:-http://localhost:8000} # With default +``` + +Add to `.env`: + +```bash +MY_API_KEY=your-key-here +BASE_URL=https://api.example.com +``` + +### Condition Types + +**Always Execute**: +```yaml +condition: + type: always +``` + +**Wake Word** (transcript events only): +```yaml +condition: + type: wake_word + wake_words: + - hey assistant + - computer +``` + +**Regex Pattern**: +```yaml +condition: + type: regex + pattern: "urgent|important" +``` + +## Testing Plugins + +### Unit Tests + +`tests/test_my_plugin.py`: + +```python +import pytest +from plugins.my_plugin import MyPlugin +from plugins.base import PluginContext + +class TestMyPlugin: + def test_plugin_initialization(self): + config = {'enabled': True, 'events': ['conversation.complete']} + plugin = MyPlugin(config) + assert plugin.enabled is True + + @pytest.mark.asyncio + async def test_conversation_processing(self): + plugin = MyPlugin({'enabled': True}) + await plugin.initialize() + + context = PluginContext( + user_id='test-user', + event='conversation.complete', + data={'transcript': 'Test transcript'} + ) + + result = await plugin.on_conversation_complete(context) + assert result.success is True +``` + +### Integration Testing + +1. **Enable Test Plugin**: +```yaml +test_event: + enabled: true + events: + - conversation.complete +``` + +2. **Check Logs**: +```bash +docker compose logs -f | grep "test_event" +``` + +3. **Upload Test Audio**: +```bash +curl -X POST http://localhost:8000/api/process-audio-files \ + -H "Authorization: Bearer $TOKEN" \ + -F "files=@test.wav" +``` + +### Manual Testing Checklist + +- [ ] Plugin loads without errors +- [ ] Configuration validates correctly +- [ ] Events trigger plugin execution +- [ ] Plugin logic executes successfully +- [ ] Errors are handled gracefully +- [ ] Logs provide useful information + +## Best Practices + +### 1. Error Handling + +Always wrap logic in try-except: + +```python +async def on_conversation_complete(self, context): + try: + # Your logic + result = await self.process(context) + return PluginResult(success=True, data=result) + except Exception as e: + logger.error(f"Error: {e}", exc_info=True) + return PluginResult(success=False, message=str(e)) +``` + +### 2. Logging + +Use appropriate log levels: + +```python +logger.debug("Detailed debug information") +logger.info("Important milestones") +logger.warning("Non-critical issues") +logger.error("Errors that need attention") +``` + +### 3. Resource Management + +Clean up in `cleanup()`: + +```python +async def initialize(self): + self.client = ExternalClient() + await self.client.connect() + +async def cleanup(self): + if self.client: + await self.client.disconnect() +``` + +### 4. Configuration Validation + +Validate in `initialize()`: + +```python +async def initialize(self): + if not self.config.get('api_key'): + raise ValueError("API key is required") + + if self.config.get('timeout', 0) <= 0: + raise ValueError("Timeout must be positive") +``` + +### 5. Async Best Practices + +Use `asyncio.to_thread()` for blocking operations: + +```python +import asyncio + +async def my_method(self): + # Run blocking operation in thread pool + result = await asyncio.to_thread(blocking_function, arg1, arg2) + return result +``` + +### 6. Database Access + +Use the global database handle: + +```python +from advanced_omi_backend.database import get_database + +async def save_data(self, data): + db = get_database() + await db['my_collection'].insert_one(data) +``` + +### 7. LLM Access + +Use the global LLM client: + +```python +from advanced_omi_backend.llm_client import async_generate + +async def generate_summary(self, text): + prompt = f"Summarize: {text}" + summary = await async_generate(prompt) + return summary +``` + +## Examples + +### Example 1: Slack Notifier + +```python +class SlackNotifierPlugin(BasePlugin): + SUPPORTED_ACCESS_LEVELS = ['conversation'] + + async def initialize(self): + self.webhook_url = self.config.get('slack_webhook_url') + if not self.webhook_url: + raise ValueError("Slack webhook URL required") + + async def on_conversation_complete(self, context): + transcript = context.data.get('transcript', '') + duration = context.data.get('duration', 0) + + message = { + "text": f"New conversation ({duration:.1f}s)", + "blocks": [{ + "type": "section", + "text": {"type": "mrkdwn", "text": f"```{transcript[:500]}```"} + }] + } + + async with aiohttp.ClientSession() as session: + await session.post(self.webhook_url, json=message) + + return PluginResult(success=True, message="Notification sent") +``` + +### Example 2: Keyword Alerter + +```python +class KeywordAlerterPlugin(BasePlugin): + SUPPORTED_ACCESS_LEVELS = ['transcript'] + + async def on_transcript(self, context): + transcript = context.data.get('transcript', '') + keywords = self.config.get('keywords', []) + + for keyword in keywords: + if keyword.lower() in transcript.lower(): + await self.send_alert(keyword, transcript) + return PluginResult( + success=True, + message=f"Alert sent for keyword: {keyword}" + ) + + return PluginResult(success=True) +``` + +### Example 3: Analytics Tracker + +```python +class AnalyticsTrackerPlugin(BasePlugin): + SUPPORTED_ACCESS_LEVELS = ['conversation', 'memory'] + + async def on_conversation_complete(self, context): + duration = context.data.get('duration', 0) + word_count = len(context.data.get('transcript', '').split()) + + await self.track_event('conversation_complete', { + 'user_id': context.user_id, + 'duration': duration, + 'word_count': word_count, + }) + + return PluginResult(success=True) + + async def on_memory_processed(self, context): + memory_count = context.data.get('memory_count', 0) + + await self.track_event('memory_processed', { + 'user_id': context.user_id, + 'memory_count': memory_count, + }) + + return PluginResult(success=True) +``` + +## Troubleshooting + +### Plugin Not Loading + +**Check logs**: +```bash +docker compose logs chronicle-backend | grep "plugin" +``` + +**Common issues**: +- Plugin directory name doesn't match class name convention +- Missing `__init__.py` or incorrect exports +- Syntax errors in plugin.py +- Not inheriting from `BasePlugin` + +**Solution**: +1. Verify directory structure matches: `plugins/my_plugin/` +2. Class name should be: `MyPluginPlugin` +3. Export in `__init__.py`: `from .plugin import MyPluginPlugin` + +### Plugin Enabled But Not Executing + +**Check**: +- Plugin enabled in `plugins.yml` +- Correct events subscribed +- Condition matches (wake_word, regex, etc.) + +**Debug**: +```python +async def on_conversation_complete(self, context): + logger.info(f"Plugin executed! Context: {context}") + # Your logic +``` + +### Configuration Errors + +**Error**: `Environment variable not found` + +**Solution**: +- Add variable to `.env` file +- Use default values: `${VAR:-default}` +- Check variable name spelling + +### Import Errors + +**Error**: `ModuleNotFoundError` + +**Solution**: +- Restart backend after adding dependencies +- Verify imports are from correct modules +- Check relative imports use `..base` for base classes + +### Database Connection Issues + +**Error**: `Database connection failed` + +**Solution**: +```python +from advanced_omi_backend.database import get_database + +async def my_method(self): + db = get_database() # Global database handle + # Use db... +``` + +## Advanced Topics + +### Custom Conditions + +Implement custom condition checking: + +```python +async def on_conversation_complete(self, context): + # Custom condition check + if not self._should_execute(context): + return PluginResult(success=True, message="Skipped") + + # Your logic + ... + +def _should_execute(self, context): + # Custom logic + duration = context.data.get('duration', 0) + return duration > 60 # Only process long conversations +``` + +### Plugin Dependencies + +Share data between plugins using context metadata: + +```python +# Plugin A +async def on_conversation_complete(self, context): + context.metadata['extracted_keywords'] = ['important', 'urgent'] + return PluginResult(success=True) + +# Plugin B (executes after Plugin A) +async def on_conversation_complete(self, context): + keywords = context.metadata.get('extracted_keywords', []) + # Use keywords... +``` + +### External Service Integration + +```python +import aiohttp + +class ExternalServicePlugin(BasePlugin): + async def initialize(self): + self.session = aiohttp.ClientSession() + self.api_url = self.config.get('api_url') + self.api_key = self.config.get('api_key') + + async def cleanup(self): + await self.session.close() + + async def on_conversation_complete(self, context): + async with self.session.post( + self.api_url, + headers={'Authorization': f'Bearer {self.api_key}'}, + json={'transcript': context.data.get('transcript')} + ) as response: + result = await response.json() + return PluginResult(success=True, data=result) +``` + +## Resources + +- **Base Plugin Class**: `backends/advanced/src/advanced_omi_backend/plugins/base.py` +- **Example Plugins**: + - Email Summarizer: `plugins/email_summarizer/` + - Home Assistant: `plugins/homeassistant/` + - Test Event: `plugins/test_event/` +- **Plugin Generator**: `scripts/create_plugin.py` +- **Configuration**: `config/plugins.yml.template` + +## Contributing Plugins + +Want to share your plugin with the community? + +1. Create a well-documented plugin +2. Add comprehensive README +3. Include configuration examples +4. Test thoroughly +5. Submit PR to Chronicle repository + +## Support + +- **GitHub Issues**: [chronicle-ai/chronicle/issues](https://github.com/chronicle-ai/chronicle/issues) +- **Discussions**: [chronicle-ai/chronicle/discussions](https://github.com/chronicle-ai/chronicle/discussions) +- **Documentation**: [Chronicle Docs](https://github.com/chronicle-ai/chronicle) + +Happy plugin development! 🚀 diff --git a/backends/advanced/scripts/create_plugin.py b/backends/advanced/scripts/create_plugin.py new file mode 100755 index 00000000..f0a2bf21 --- /dev/null +++ b/backends/advanced/scripts/create_plugin.py @@ -0,0 +1,431 @@ +#!/usr/bin/env python3 +""" +Plugin Generator Script for Chronicle. + +Creates boilerplate plugin structure with templates and examples. + +Usage: + uv run python scripts/create_plugin.py my_awesome_plugin +""" +import argparse +import os +import sys +from pathlib import Path + + +def snake_to_pascal(snake_str: str) -> str: + """Convert snake_case to PascalCase.""" + return ''.join(word.capitalize() for word in snake_str.split('_')) + + +def create_plugin(plugin_name: str, force: bool = False): + """ + Create a new plugin with boilerplate structure. + + Args: + plugin_name: Plugin name in snake_case (e.g., my_awesome_plugin) + force: Overwrite existing plugin if True + """ + # Validate plugin name + if not plugin_name.replace('_', '').isalnum(): + print(f"❌ Error: Plugin name must be alphanumeric with underscores") + print(f" Got: {plugin_name}") + print(f" Example: my_awesome_plugin") + sys.exit(1) + + # Convert to class name + class_name = snake_to_pascal(plugin_name) + 'Plugin' + + # Get plugins directory + script_dir = Path(__file__).parent + backend_dir = script_dir.parent + plugins_dir = backend_dir / 'src' / 'advanced_omi_backend' / 'plugins' + plugin_dir = plugins_dir / plugin_name + + # Check if plugin already exists + if plugin_dir.exists() and not force: + print(f"❌ Error: Plugin '{plugin_name}' already exists at {plugin_dir}") + print(f" Use --force to overwrite") + sys.exit(1) + + # Create plugin directory + print(f"📁 Creating plugin directory: {plugin_dir}") + plugin_dir.mkdir(parents=True, exist_ok=True) + + # Create __init__.py + init_content = f'''""" +{class_name} for Chronicle. + +[Brief description of what your plugin does] +""" + +from .plugin import {class_name} + +__all__ = ['{class_name}'] +''' + + init_file = plugin_dir / '__init__.py' + print(f"📝 Creating {init_file}") + init_file.write_text(init_content) + + # Create plugin.py with template + plugin_content = f'''""" +{class_name} implementation. + +This plugin [describe what it does]. +""" +import logging +from typing import Any, Dict, List, Optional + +from ..base import BasePlugin, PluginContext, PluginResult + +logger = logging.getLogger(__name__) + + +class {class_name}(BasePlugin): + """ + [Plugin description] + + Subscribes to: [list events you want to subscribe to] + - transcript.streaming: Real-time transcript segments + - conversation.complete: When conversation finishes + - memory.processed: After memory extraction + + Configuration (config/plugins.yml): + {plugin_name}: + enabled: true + events: + - conversation.complete # Change to your event + condition: + type: always # or wake_word, regex, etc. + # Your custom config here: + my_setting: ${{MY_ENV_VAR}} + """ + + # Declare which access levels this plugin supports + # Options: 'transcript', 'conversation', 'memory' + SUPPORTED_ACCESS_LEVELS: List[str] = ['conversation'] + + def __init__(self, config: Dict[str, Any]): + """ + Initialize plugin with configuration. + + Args: + config: Plugin configuration from config/plugins.yml + """ + super().__init__(config) + + # Load your custom configuration + self.my_setting = config.get('my_setting', 'default_value') + + logger.info(f"{class_name} configuration loaded") + + async def initialize(self): + """ + Initialize plugin resources. + + Called during application startup. + Use this to: + - Connect to external services + - Initialize clients + - Validate configuration + - Set up resources + + Raises: + Exception: If initialization fails + """ + if not self.enabled: + logger.info(f"{class_name} is disabled, skipping initialization") + return + + logger.info(f"Initializing {class_name}...") + + # TODO: Add your initialization code here + # Example: + # self.client = SomeClient(self.my_setting) + # await self.client.connect() + + logger.info(f"✅ {class_name} initialized successfully") + + async def cleanup(self): + """ + Clean up plugin resources. + + Called during application shutdown. + Use this to: + - Close connections + - Save state + - Release resources + """ + logger.info(f"{class_name} cleanup complete") + + # Implement the methods for events you subscribed to: + + async def on_transcript(self, context: PluginContext) -> Optional[PluginResult]: + """ + Handle transcript.streaming events. + + Context data contains: + - transcript: str - The transcript text + - segment_id: str - Unique segment identifier + - conversation_id: str - Current conversation ID + + For wake_word conditions, router adds: + - command: str - Command with wake word stripped + - original_transcript: str - Full transcript + + Args: + context: Plugin context with transcript data + + Returns: + PluginResult with success status and optional message + """ + # TODO: Implement if you subscribed to transcript.streaming + pass + + async def on_conversation_complete(self, context: PluginContext) -> Optional[PluginResult]: + """ + Handle conversation.complete events. + + Context data contains: + - conversation: dict - Full conversation data + - transcript: str - Complete transcript + - duration: float - Conversation duration + - conversation_id: str - Conversation identifier + + Args: + context: Plugin context with conversation data + + Returns: + PluginResult with success status and optional message + """ + try: + logger.info(f"Processing conversation complete event for user: {{context.user_id}}") + + # Extract data from context + conversation = context.data.get('conversation', {{}}) + transcript = context.data.get('transcript', '') + duration = context.data.get('duration', 0) + conversation_id = context.data.get('conversation_id', 'unknown') + + # TODO: Add your plugin logic here + # Example: + # - Process the transcript + # - Call external APIs + # - Store data + # - Trigger actions + + logger.info(f"Processed conversation {{conversation_id}}") + + return PluginResult( + success=True, + message="Processing complete", + data={{'conversation_id': conversation_id}} + ) + + except Exception as e: + logger.error(f"Error in {class_name}: {{e}}", exc_info=True) + return PluginResult( + success=False, + message=f"Error: {{str(e)}}" + ) + + async def on_memory_processed(self, context: PluginContext) -> Optional[PluginResult]: + """ + Handle memory.processed events. + + Context data contains: + - memories: list - Extracted memories + - conversation: dict - Source conversation + - memory_count: int - Number of memories created + - conversation_id: str - Conversation identifier + + Args: + context: Plugin context with memory data + + Returns: + PluginResult with success status and optional message + """ + # TODO: Implement if you subscribed to memory.processed + pass + + # Add your custom helper methods here: + + async def _my_helper_method(self, data: Any) -> Any: + """ + Example helper method. + + Args: + data: Input data + + Returns: + Processed data + """ + # TODO: Implement your helper logic + pass +''' + + plugin_file = plugin_dir / 'plugin.py' + print(f"📝 Creating {plugin_file}") + plugin_file.write_text(plugin_content) + + # Create README.md + readme_content = f'''# {class_name} + +[Brief description of what your plugin does] + +## Features + +- Feature 1 +- Feature 2 +- Feature 3 + +## Configuration + +### Step 1: Environment Variables + +Add to `backends/advanced/.env`: + +```bash +# {class_name} Configuration +MY_ENV_VAR=your-value-here +``` + +### Step 2: Plugin Configuration + +Add to `config/plugins.yml`: + +```yaml +plugins: + {plugin_name}: + enabled: true + events: + - conversation.complete # Change to your event + condition: + type: always + + # Your custom configuration + my_setting: ${{MY_ENV_VAR}} +``` + +### Step 3: Restart Backend + +```bash +cd backends/advanced +docker compose restart +``` + +## How It Works + +1. [Step 1 description] +2. [Step 2 description] +3. [Step 3 description] + +## Configuration Options + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `my_setting` | string | `default` | Description of setting | + +## Testing + +```bash +# Add testing instructions here +``` + +## Troubleshooting + +### Issue 1 + +Solution 1 + +### Issue 2 + +Solution 2 + +## Development + +### File Structure + +``` +plugins/{plugin_name}/ +├── __init__.py # Plugin exports +├── plugin.py # Main plugin logic +└── README.md # This file +``` + +## License + +MIT License - see project LICENSE file for details. +''' + + readme_file = plugin_dir / 'README.md' + print(f"📝 Creating {readme_file}") + readme_file.write_text(readme_content) + + # Print success message and next steps + print(f"\n✅ Plugin '{plugin_name}' created successfully!\n") + print(f"📁 Location: {plugin_dir}\n") + print(f"📋 Next steps:") + print(f" 1. Edit {plugin_file}") + print(f" - Implement your plugin logic") + print(f" - Choose which events to subscribe to") + print(f" - Add your configuration options") + print(f"") + print(f" 2. Update config/plugins.yml:") + print(f" ```yaml") + print(f" plugins:") + print(f" {plugin_name}:") + print(f" enabled: true") + print(f" events:") + print(f" - conversation.complete") + print(f" condition:") + print(f" type: always") + print(f" ```") + print(f"") + print(f" 3. Add environment variables to .env (if needed)") + print(f"") + print(f" 4. Restart backend:") + print(f" cd backends/advanced && docker compose restart") + print(f"") + print(f"📖 Resources:") + print(f" - Plugin development guide: docs/plugin-development-guide.md") + print(f" - Example plugin: plugins/email_summarizer/") + print(f" - Base plugin class: plugins/base.py") + + +def main(): + parser = argparse.ArgumentParser( + description='Create a new Chronicle plugin with boilerplate structure', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=''' +Examples: + uv run python scripts/create_plugin.py my_awesome_plugin + uv run python scripts/create_plugin.py slack_notifier + uv run python scripts/create_plugin.py todo_extractor --force + ''' + ) + parser.add_argument( + 'plugin_name', + help='Plugin name in snake_case (e.g., my_awesome_plugin)' + ) + parser.add_argument( + '--force', '-f', + action='store_true', + help='Overwrite existing plugin if it exists' + ) + + args = parser.parse_args() + + try: + create_plugin(args.plugin_name, force=args.force) + except KeyboardInterrupt: + print("\n\n❌ Plugin creation cancelled") + sys.exit(1) + except Exception as e: + print(f"\n❌ Error creating plugin: {e}") + sys.exit(1) + + +if __name__ == '__main__': + main() diff --git a/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/README.md b/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/README.md new file mode 100644 index 00000000..2df9a90b --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/README.md @@ -0,0 +1,233 @@ +# Email Summarizer Plugin + +Automatically sends email summaries when conversations complete. + +## Features + +- 📧 **Automatic Email Delivery**: Sends emails when conversations finish +- 🤖 **LLM-Powered Summaries**: Uses your configured LLM to generate intelligent summaries +- 🎨 **Beautiful HTML Emails**: Professional-looking emails with proper formatting +- 📱 **Plain Text Fallback**: Ensures compatibility with all email clients +- ⚡ **Async Processing**: Non-blocking email sending +- 🔒 **Secure SMTP**: TLS/SSL encryption support + +## How It Works + +1. User completes a conversation (via OMI device or file upload) +2. Plugin receives `conversation.complete` event +3. Retrieves user email from database +4. Generates LLM summary (2-3 sentences) +5. Formats beautiful HTML and plain text emails +6. Sends email via configured SMTP server + +## Configuration + +### Step 1: Get SMTP Credentials + +#### For Gmail (Recommended for Testing): + +1. **Enable 2-Factor Authentication** on your Google account +2. Go to Google Account → Security → 2-Step Verification +3. Scroll down to **App passwords** +4. Generate an app password for "Mail" +5. Copy the 16-character password (no spaces) + +#### For Other Providers: + +- **Outlook/Hotmail**: smtp.office365.com:587 +- **Yahoo**: smtp.mail.yahoo.com:587 +- **Custom SMTP**: Use your provider's settings + +### Step 2: Configure Environment Variables + +Add to `backends/advanced/.env`: + +```bash +# Email Summarizer Plugin +SMTP_HOST=smtp.gmail.com +SMTP_PORT=587 +SMTP_USERNAME=your-email@gmail.com +SMTP_PASSWORD=your-app-password-here # Gmail App Password (16 chars, no spaces) +SMTP_USE_TLS=true +FROM_EMAIL=noreply@chronicle.ai +FROM_NAME=Chronicle AI +``` + +### Step 3: Enable Plugin + +Add to `config/plugins.yml`: + +```yaml +plugins: + email_summarizer: + enabled: true + events: + - conversation.complete + condition: + type: always + + # SMTP Configuration + smtp_host: ${SMTP_HOST:-smtp.gmail.com} + smtp_port: ${SMTP_PORT:-587} + smtp_username: ${SMTP_USERNAME} + smtp_password: ${SMTP_PASSWORD} + smtp_use_tls: ${SMTP_USE_TLS:-true} + from_email: ${FROM_EMAIL:-noreply@chronicle.ai} + from_name: ${FROM_NAME:-Chronicle AI} + + # Email Content Options + subject_prefix: "Conversation Summary" + summary_max_sentences: 3 + include_conversation_id: true + include_duration: true +``` + +### Step 4: Restart Backend + +```bash +cd backends/advanced +docker compose restart +``` + +## Configuration Options + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `smtp_host` | string | `smtp.gmail.com` | SMTP server hostname | +| `smtp_port` | integer | `587` | SMTP server port (587 for TLS, 465 for SSL) | +| `smtp_username` | string | **Required** | SMTP authentication username | +| `smtp_password` | string | **Required** | SMTP authentication password | +| `smtp_use_tls` | boolean | `true` | Use STARTTLS encryption | +| `from_email` | string | **Required** | Sender email address | +| `from_name` | string | `Chronicle AI` | Sender display name | +| `subject_prefix` | string | `Conversation Summary` | Email subject prefix | +| `summary_max_sentences` | integer | `3` | Maximum sentences in LLM summary | +| `include_conversation_id` | boolean | `true` | Show conversation ID in email | +| `include_duration` | boolean | `true` | Show conversation duration | + +## Email Template + +### Subject Line +``` +Conversation Summary - Jan 15, 2025 at 10:30 AM +``` + +### Email Body +``` +📋 SUMMARY +[LLM-generated 2-3 sentence summary of key points] + +📝 FULL TRANSCRIPT +[Complete conversation transcript] + +📊 METADATA +Duration: 5m 30s +Conversation ID: 507f1f77bc... +``` + +## Testing + +### Test SMTP Connection + +```bash +cd backends/advanced +uv run python -m advanced_omi_backend.services.email_service +``` + +This will: +- Test SMTP connectivity +- Send a test email to your SMTP username +- Verify configuration + +### Test Plugin Integration + +1. Start the backend with plugin enabled +2. Upload a test audio file or use OMI device +3. Wait for conversation to complete +4. Check your email inbox + +## Troubleshooting + +### "Authentication failed" + +**For Gmail:** +- Make sure you're using an **App Password**, not your regular password +- Enable 2-Factor Authentication first +- App password should be 16 characters (xxxx xxxx xxxx xxxx) + +**For other providers:** +- Verify username and password are correct +- Check if "less secure apps" needs to be enabled + +### "Connection timeout" + +- Check `smtp_host` and `smtp_port` are correct +- Verify firewall allows outbound SMTP connections +- Try port 465 with SSL instead of 587 with TLS + +### "No email received" + +- Check user has email configured in database +- Look for plugin logs: `docker compose logs -f chronicle-backend | grep EmailSummarizer` +- Verify plugin is enabled in `plugins.yml` +- Check spam/junk folder + +### "Empty summary" or "LLM error" + +- Verify LLM service is configured and running +- Check LLM API keys are valid +- Plugin will fall back to truncated transcript if LLM fails + +## Security Considerations + +1. **Never commit SMTP passwords** to git +2. **Use environment variables** for sensitive config +3. **Enable TLS/SSL** for encrypted connections +4. **Gmail App Passwords** are safer than account passwords +5. **Rotate credentials** periodically + +## Development + +### File Structure + +``` +plugins/email_summarizer/ +├── __init__.py # Plugin exports +├── plugin.py # Main plugin logic +├── templates.py # Email HTML/text templates +└── README.md # This file +``` + +### Key Methods + +- `on_conversation_complete()` - Main event handler +- `_get_user_email()` - Fetch user email from database +- `_generate_summary()` - Generate LLM summary with fallback +- `_format_subject()` - Format email subject line + +### Dependencies + +- `advanced_omi_backend.database` - MongoDB access +- `advanced_omi_backend.llm_client` - LLM generation +- `advanced_omi_backend.services.email_service` - SMTP email sending + +## Future Enhancements + +- [ ] Email templates customization +- [ ] User preference for email frequency +- [ ] Unsubscribe link +- [ ] Email digests (daily/weekly summaries) +- [ ] Rich formatting for action items +- [ ] Attachment support (audio files) +- [ ] Multiple recipient support +- [ ] Email open tracking + +## Support + +- **Issues**: [GitHub Issues](https://github.com/chronicle-ai/chronicle/issues) +- **Discussions**: [GitHub Discussions](https://github.com/chronicle-ai/chronicle/discussions) +- **Documentation**: [Chronicle Docs](https://github.com/chronicle-ai/chronicle) + +## License + +MIT License - see project LICENSE file for details. diff --git a/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/__init__.py b/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/__init__.py new file mode 100644 index 00000000..525acd51 --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/__init__.py @@ -0,0 +1,9 @@ +""" +Email Summarizer Plugin for Chronicle. + +Automatically sends email summaries when conversations complete. +""" + +from .plugin import EmailSummarizerPlugin + +__all__ = ['EmailSummarizerPlugin'] diff --git a/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/email_service.py b/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/email_service.py new file mode 100644 index 00000000..264b4f96 --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/email_service.py @@ -0,0 +1,219 @@ +""" +SMTP Email Service for Chronicle. + +Provides email sending functionality via SMTP protocol with support for: +- HTML and plain text emails +- TLS/SSL encryption +- Gmail and other SMTP providers +- Async implementation +""" +import asyncio +import logging +import smtplib +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from typing import Any, Dict, Optional + +logger = logging.getLogger(__name__) + + +class SMTPEmailService: + """SMTP email service for sending emails via SMTP protocol.""" + + def __init__(self, config: Dict[str, Any]): + """ + Initialize SMTP email service with configuration. + + Args: + config: SMTP configuration containing: + - smtp_host: SMTP server hostname + - smtp_port: SMTP server port (default: 587) + - smtp_username: SMTP username + - smtp_password: SMTP password + - smtp_use_tls: Whether to use TLS (default: True) + - from_email: Sender email address + - from_name: Sender display name (default: 'Chronicle AI') + """ + self.host = config.get('smtp_host') + self.port = config.get('smtp_port', 587) + self.username = config.get('smtp_username') + self.password = config.get('smtp_password') + self.use_tls = config.get('smtp_use_tls', True) + self.from_email = config.get('from_email') + self.from_name = config.get('from_name', 'Chronicle AI') + + # Validate required configuration + if not all([self.host, self.username, self.password, self.from_email]): + raise ValueError( + "SMTP configuration incomplete. Required: smtp_host, smtp_username, " + "smtp_password, from_email" + ) + + logger.info( + f"SMTP Email Service initialized: {self.username}@{self.host}:{self.port} " + f"(TLS: {self.use_tls})" + ) + + async def send_email( + self, + to_email: str, + subject: str, + body_text: str, + body_html: Optional[str] = None + ) -> bool: + """ + Send email via SMTP with HTML/text support. + + Args: + to_email: Recipient email address + subject: Email subject line + body_text: Plain text email body + body_html: Optional HTML email body + + Returns: + True if email sent successfully, False otherwise + """ + try: + # Create message container + msg = MIMEMultipart('alternative') + msg['Subject'] = subject + msg['From'] = f"{self.from_name} <{self.from_email}>" + msg['To'] = to_email + + # Attach plain text version + text_part = MIMEText(body_text, 'plain') + msg.attach(text_part) + + # Attach HTML version if provided + if body_html: + html_part = MIMEText(body_html, 'html') + msg.attach(html_part) + + # Send email asynchronously (run in thread pool to avoid blocking) + await asyncio.to_thread(self._send_smtp, msg, to_email) + + logger.info(f"✅ Email sent successfully to {to_email}: {subject}") + return True + + except Exception as e: + logger.error(f"Failed to send email to {to_email}: {e}", exc_info=True) + return False + + def _send_smtp(self, msg: MIMEMultipart, to_email: str) -> None: + """ + Internal method to send email via SMTP (blocking). + + Args: + msg: MIME message to send + to_email: Recipient email address + + Raises: + Exception: If SMTP sending fails + """ + # Connect to SMTP server + if self.use_tls: + # Use STARTTLS (most common for port 587) + smtp_server = smtplib.SMTP(self.host, self.port) + smtp_server.ehlo() + smtp_server.starttls() + smtp_server.ehlo() + else: + # Direct connection (for port 465 SSL or no encryption) + smtp_server = smtplib.SMTP(self.host, self.port) + + try: + # Login and send + smtp_server.login(self.username, self.password) + smtp_server.send_message(msg) + logger.debug(f"SMTP send completed for {to_email}") + finally: + smtp_server.quit() + + async def test_connection(self) -> bool: + """ + Test SMTP connectivity and authentication. + + Returns: + True if connection successful, False otherwise + """ + try: + await asyncio.to_thread(self._test_smtp_connection) + logger.info(f"✅ SMTP connection test successful: {self.username}@{self.host}") + return True + except Exception as e: + logger.error(f"SMTP connection test failed: {e}", exc_info=True) + return False + + def _test_smtp_connection(self) -> None: + """ + Internal method to test SMTP connection (blocking). + + Raises: + Exception: If connection fails + """ + if self.use_tls: + smtp_server = smtplib.SMTP(self.host, self.port, timeout=10) + smtp_server.ehlo() + smtp_server.starttls() + smtp_server.ehlo() + else: + smtp_server = smtplib.SMTP(self.host, self.port, timeout=10) + + try: + smtp_server.login(self.username, self.password) + logger.debug("SMTP authentication successful") + finally: + smtp_server.quit() + + +# Test script for development/debugging +async def main(): + """Test the SMTP email service.""" + import os + from dotenv import load_dotenv + + load_dotenv() + + config = { + 'smtp_host': os.getenv('SMTP_HOST', 'smtp.gmail.com'), + 'smtp_port': int(os.getenv('SMTP_PORT', 587)), + 'smtp_username': os.getenv('SMTP_USERNAME'), + 'smtp_password': os.getenv('SMTP_PASSWORD'), + 'smtp_use_tls': os.getenv('SMTP_USE_TLS', 'true').lower() == 'true', + 'from_email': os.getenv('FROM_EMAIL', 'noreply@chronicle.ai'), + 'from_name': os.getenv('FROM_NAME', 'Chronicle AI'), + } + + try: + service = SMTPEmailService(config) + + # Test connection + print("Testing SMTP connection...") + if await service.test_connection(): + print("✅ Connection test passed") + else: + print("❌ Connection test failed") + return + + # Send test email + test_email = config['smtp_username'] # Send to self + print(f"\nSending test email to {test_email}...") + + success = await service.send_email( + to_email=test_email, + subject="Chronicle Email Service Test", + body_text="This is a test email from Chronicle Email Service.\n\nIf you received this, the email service is working correctly!", + body_html="
This is a test email from Chronicle Email Service.
If you received this, the email service is working correctly!
" + ) + + if success: + print("✅ Test email sent successfully") + else: + print("❌ Failed to send test email") + + except Exception as e: + print(f"❌ Error: {e}") + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/plugin.py b/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/plugin.py new file mode 100644 index 00000000..161310e4 --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/plugin.py @@ -0,0 +1,281 @@ +""" +Email Summarizer Plugin for Chronicle. + +Automatically sends email summaries when conversations complete. +""" +import logging +from datetime import datetime +from typing import Any, Dict, List, Optional + +from advanced_omi_backend.database import get_database +from advanced_omi_backend.llm_client import async_generate + +from ..base import BasePlugin, PluginContext, PluginResult +from .email_service import SMTPEmailService +from .templates import format_html_email, format_text_email + +logger = logging.getLogger(__name__) + + +class EmailSummarizerPlugin(BasePlugin): + """ + Plugin for sending email summaries when conversations complete. + + Subscribes to conversation.complete events and: + 1. Retrieves user email from database + 2. Generates LLM summary of the conversation + 3. Formats HTML and plain text emails + 4. Sends email via SMTP + + Configuration (config/plugins.yml): + enabled: true + events: + - conversation.complete + condition: + type: always + smtp_host: smtp.gmail.com + smtp_port: 587 + smtp_username: ${SMTP_USERNAME} + smtp_password: ${SMTP_PASSWORD} + smtp_use_tls: true + from_email: noreply@chronicle.ai + from_name: Chronicle AI + subject_prefix: "Conversation Summary" + summary_max_sentences: 3 + """ + + SUPPORTED_ACCESS_LEVELS: List[str] = ['conversation'] + + def __init__(self, config: Dict[str, Any]): + """ + Initialize Email Summarizer plugin. + + Args: + config: Plugin configuration from config/plugins.yml + """ + super().__init__(config) + + self.subject_prefix = config.get('subject_prefix', 'Conversation Summary') + self.summary_max_sentences = config.get('summary_max_sentences', 3) + self.include_conversation_id = config.get('include_conversation_id', True) + self.include_duration = config.get('include_duration', True) + + # Email service will be initialized in initialize() + self.email_service: Optional[SMTPEmailService] = None + + # MongoDB database handle + self.db = None + + async def initialize(self): + """ + Initialize plugin resources. + + Sets up SMTP email service and MongoDB connection. + + Raises: + ValueError: If SMTP configuration is incomplete + Exception: If email service initialization fails + """ + if not self.enabled: + logger.info("Email Summarizer plugin is disabled, skipping initialization") + return + + logger.info("Initializing Email Summarizer plugin...") + + # Initialize SMTP email service + try: + smtp_config = { + 'smtp_host': self.config.get('smtp_host'), + 'smtp_port': self.config.get('smtp_port', 587), + 'smtp_username': self.config.get('smtp_username'), + 'smtp_password': self.config.get('smtp_password'), + 'smtp_use_tls': self.config.get('smtp_use_tls', True), + 'from_email': self.config.get('from_email'), + 'from_name': self.config.get('from_name', 'Chronicle AI'), + } + + self.email_service = SMTPEmailService(smtp_config) + + # Test SMTP connection + logger.info("Testing SMTP connectivity...") + if await self.email_service.test_connection(): + logger.info("✅ SMTP connection test successful") + else: + raise Exception("SMTP connection test failed") + + except Exception as e: + logger.error(f"Failed to initialize email service: {e}") + raise + + # Get MongoDB database handle + self.db = get_database() + logger.info("✅ Email Summarizer plugin initialized successfully") + + async def cleanup(self): + """Clean up plugin resources.""" + logger.info("Email Summarizer plugin cleanup complete") + + async def on_conversation_complete(self, context: PluginContext) -> Optional[PluginResult]: + """ + Send email summary when conversation completes. + + Args: + context: Plugin context with conversation data + - conversation: dict - Full conversation data + - transcript: str - Complete transcript + - duration: float - Conversation duration + - conversation_id: str - Conversation identifier + + Returns: + PluginResult with success status and message + """ + try: + logger.info(f"Processing conversation complete event for user: {context.user_id}") + + # Extract conversation data + conversation = context.data.get('conversation', {}) + transcript = context.data.get('transcript', '') + duration = context.data.get('duration', 0) + conversation_id = context.data.get('conversation_id', 'unknown') + created_at = conversation.get('created_at') + + # Validate transcript exists + if not transcript or transcript.strip() == '': + logger.warning(f"Empty transcript for conversation {conversation_id}, skipping email") + return PluginResult( + success=False, + message="Skipped: Empty transcript" + ) + + # Get user email from database + user_email = await self._get_user_email(context.user_id) + if not user_email: + logger.warning(f"No email found for user {context.user_id}, cannot send summary") + return PluginResult( + success=False, + message=f"No email configured for user {context.user_id}" + ) + + # Generate LLM summary + summary = await self._generate_summary(transcript) + + # Format email subject and body + subject = self._format_subject(created_at) + body_html = format_html_email( + summary=summary, + transcript=transcript, + conversation_id=conversation_id, + duration=duration, + created_at=created_at + ) + body_text = format_text_email( + summary=summary, + transcript=transcript, + conversation_id=conversation_id, + duration=duration, + created_at=created_at + ) + + # Send email + success = await self.email_service.send_email( + to_email=user_email, + subject=subject, + body_text=body_text, + body_html=body_html + ) + + if success: + logger.info(f"✅ Email summary sent to {user_email} for conversation {conversation_id}") + return PluginResult( + success=True, + message=f"Email sent to {user_email}", + data={'recipient': user_email, 'conversation_id': conversation_id} + ) + else: + logger.error(f"Failed to send email to {user_email}") + return PluginResult( + success=False, + message=f"Failed to send email to {user_email}" + ) + + except Exception as e: + logger.error(f"Error in email summarizer plugin: {e}", exc_info=True) + return PluginResult( + success=False, + message=f"Error: {str(e)}" + ) + + async def _get_user_email(self, user_id: str) -> Optional[str]: + """ + Get user email from database. + + Args: + user_id: User identifier (MongoDB ObjectId) + + Returns: + User email if found, None otherwise + """ + try: + from bson import ObjectId + + # Query users collection + user = await self.db['users'].find_one({'_id': ObjectId(user_id)}) + + if user and 'email' in user: + return user['email'] + + logger.warning(f"User {user_id} not found or has no email") + return None + + except Exception as e: + logger.error(f"Error fetching user email: {e}", exc_info=True) + return None + + async def _generate_summary(self, transcript: str) -> str: + """ + Generate LLM summary of the conversation. + + Args: + transcript: Full conversation transcript + + Returns: + Generated summary (2-3 sentences) + """ + try: + prompt = ( + f"Summarize this conversation in {self.summary_max_sentences} sentences or less. " + f"Focus on key points, main topics discussed, and any action items or decisions. " + f"Be concise and clear.\n\n" + f"Conversation:\n{transcript}" + ) + + logger.debug("Generating LLM summary...") + summary = await async_generate(prompt) + + if not summary or summary.strip() == '': + raise ValueError("LLM returned empty summary") + + logger.info("✅ LLM summary generated successfully") + return summary.strip() + + except Exception as e: + logger.error(f"Failed to generate LLM summary: {e}", exc_info=True) + # Fallback: return first 300 characters of transcript + logger.warning("Using fallback: truncated transcript") + return transcript[:300] + "..." if len(transcript) > 300 else transcript + + def _format_subject(self, created_at: Optional[datetime] = None) -> str: + """ + Format email subject line. + + Args: + created_at: Conversation creation timestamp + + Returns: + Formatted subject line + """ + if created_at: + date_str = created_at.strftime("%b %d, %Y at %I:%M %p") + return f"{self.subject_prefix} - {date_str}" + else: + return self.subject_prefix diff --git a/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/templates.py b/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/templates.py new file mode 100644 index 00000000..706a87dd --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/plugins/email_summarizer/templates.py @@ -0,0 +1,253 @@ +""" +Email templates for the Email Summarizer plugin. + +Provides HTML and plain text email templates. +""" +from datetime import datetime +from typing import Optional + + +def format_duration(seconds: float) -> str: + """ + Format duration in seconds to human-readable format. + + Args: + seconds: Duration in seconds + + Returns: + Formatted duration (e.g., "5m 30s", "1h 15m") + """ + hours = int(seconds // 3600) + minutes = int((seconds % 3600) // 60) + secs = int(seconds % 60) + + if hours > 0: + return f"{hours}h {minutes}m" + elif minutes > 0: + return f"{minutes}m {secs}s" + else: + return f"{secs}s" + + +def format_html_email( + summary: str, + transcript: str, + conversation_id: str, + duration: float, + created_at: Optional[datetime] = None +) -> str: + """ + Format HTML email template. + + Args: + summary: LLM-generated summary + transcript: Full conversation transcript + conversation_id: Conversation identifier + duration: Conversation duration in seconds + created_at: Conversation creation timestamp + + Returns: + HTML email body + """ + formatted_duration = format_duration(duration) + date_str = created_at.strftime("%B %d, %Y at %I:%M %p") if created_at else "N/A" + + # Format transcript with line breaks + transcript_html = transcript.replace('\n', '{summary}
+