From 9c1ee49cf68b0ffbb546c1ee62781de797e0f0e4 Mon Sep 17 00:00:00 2001 From: Christian Ulstrup Date: Mon, 23 Jun 2025 14:40:27 -0400 Subject: [PATCH] Add support for Brave browser transcript extraction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Create new vapi_transcripts_brave.py script for Brave browser support - Use separate API key (VAPI_API_KEY_BRAVE) for different VAPI accounts - Update .gitignore to exclude Brave-specific temporary files and logs - Maintain same functionality as Chrome version but targets Brave Browser This allows users to extract VAPI transcripts from Brave browser tabs when using a different VAPI account/API key than their Chrome setup. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .gitignore | 2 + vapi_transcripts_brave.py | 1085 +++++++++++++++++++++++++++++++++++++ 2 files changed, 1087 insertions(+) create mode 100755 vapi_transcripts_brave.py diff --git a/.gitignore b/.gitignore index c4dc31e..297397d 100644 --- a/.gitignore +++ b/.gitignore @@ -30,8 +30,10 @@ env/ # Temporary files _temp_fetch.py +_temp_fetch_brave.py *.log vapi_transcript_debug.log +vapi_transcript_brave_debug.log # Configuration .config/ diff --git a/vapi_transcripts_brave.py b/vapi_transcripts_brave.py new file mode 100755 index 0000000..c700876 --- /dev/null +++ b/vapi_transcripts_brave.py @@ -0,0 +1,1085 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +# Required parameters: +# @raycast.schemaVersion 1 +# @raycast.title VAPI Brave: Extract Voice Call Transcripts +# @raycast.mode silent + +# Optional parameters: +# @raycast.icon 🎙️ +# @raycast.packageName VAPI Tools + +# Documentation: +# @raycast.description Extract and paste transcripts from VAPI voice assistant calls (Brave browser) +# @raycast.author christian_ulstrup + +import os +import sys +import subprocess +import re +import json +import traceback +import argparse +from datetime import datetime, timedelta +from typing import List, Dict, Any, Optional, Tuple, Union, NoReturn + +# Get the absolute path to the script directory +SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) + +# Set up logging to file +LOG_FILE = os.path.join(SCRIPT_DIR, "vapi_transcript_brave_debug.log") + +def log(message: str) -> None: + """ + Write message to log file. + + Args: + message: The message to log + """ + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + with open(LOG_FILE, "a") as f: + f.write(f"[{timestamp}] {message}\n") + +# Log script start with environment info +log(f"Script started. Python version: {sys.version}") +log(f"Script directory: {SCRIPT_DIR}") +log(f"Current working directory: {os.getcwd()}") + +def run_with_venv(command: str) -> subprocess.CompletedProcess: + """ + Run a command with the virtual environment activated. + + Args: + command: The command to run in the virtual environment + + Returns: + CompletedProcess instance containing the result + """ + # Create a command that activates venv and then runs the provided command + full_command = f"cd {SCRIPT_DIR} && source {SCRIPT_DIR}/venv/bin/activate && {command}" + log(f"Running command: {full_command}") + result = subprocess.run(full_command, shell=True, capture_output=True, text=True) + log(f"Command result: exit code {result.returncode}") + if result.stdout: + log(f"Command stdout: {result.stdout[:200]}..." if len(result.stdout) > 200 else f"Command stdout: {result.stdout}") + if result.stderr: + log(f"Command stderr: {result.stderr}") + return result + +def get_foreground_tab_url() -> str: + """ + Get URL from the active Brave tab. + + Returns: + URL of the active Brave tab or empty string if not available + """ + script = ''' + tell application "Brave Browser" + try + get URL of active tab of front window + on error + return "" + end try + end tell + ''' + try: + result = subprocess.run(['osascript', '-e', script], + capture_output=True, text=True, check=False) + if result.returncode == 0: + return result.stdout.strip() + log(f"Error getting Brave URL: {result.stderr}") + return "" + except subprocess.CalledProcessError as e: + log(f"Error getting Brave URL: {str(e)}") + return "" + except Exception as e: + log(f"Unexpected error getting Brave URL: {str(e)}") + return "" + +def get_brave_tabs() -> List[str]: + """ + Get URLs from all Brave tabs. + + Returns: + List of URLs from all open Brave tabs or empty list if not available + """ + # First, check if Brave is running + check_script = ''' + on is_running(appName) + tell application "System Events" + return (count of (every process whose name is appName)) > 0 + end tell + end is_running + + is_running("Brave Browser") + ''' + + try: + check_result = subprocess.run(['osascript', '-e', check_script], + capture_output=True, text=True, check=False) + if check_result.returncode != 0 or check_result.stdout.strip().lower() != "true": + log("Brave is not running") + return [] + + # Now try to get tabs + script = ''' + tell application "Brave Browser" + set tabList to "" + set windowCount to count of windows + if windowCount > 0 then + repeat with i from 1 to windowCount + set theWindow to window i + set tabCount to count of tabs of theWindow + repeat with j from 1 to tabCount + set theTab to tab j of theWindow + set theURL to URL of theTab + -- Clean URL output to prevent weird characters + if tabList is "" then + set tabList to theURL + else + set tabList to tabList & "|" & theURL + end if + end repeat + end repeat + end if + return tabList + end tell + ''' + + result = subprocess.run(['osascript', '-e', script], + capture_output=True, text=True, check=False) + + if result.returncode == 0 and result.stdout.strip(): + # Split by the pipe character we added + tabs = result.stdout.strip().split('|') + # Remove any empty entries and clean up each URL + tabs = [tab.strip().rstrip(',') for tab in tabs if tab.strip()] + log(f"Successfully retrieved {len(tabs)} tabs from Brave") + return tabs + + log(f"Error or empty result getting Brave tabs: {result.stderr}") + return [] + + except subprocess.CalledProcessError as e: + log(f"Error getting Brave tabs: {str(e)}") + return [] + except Exception as e: + log(f"Unexpected error getting Brave tabs: {str(e)}") + return [] + +def extract_assistant_id(url: str) -> Optional[str]: + """ + Extract assistant ID from a VAPI dashboard URL. + + Args: + url: URL potentially containing an assistantId parameter + + Returns: + The assistant ID if found, None otherwise + """ + # Log the actual URL for debugging + log(f"Examining URL for assistantId: {url}") + + # Clean up URL: remove trailing commas and leading/trailing spaces + url = url.strip().rstrip(',') + log(f"Cleaned URL: {url}") + + # No special case hardcoded IDs in the open source version + + # Try multiple patterns to extract assistant ID + patterns = [ + r'assistantId=([^&,\s]+)', # Standard query parameter (with comma exclusion) + r'/assistant/([^/,\s]+)', # URL path parameter (with comma exclusion) + r'/assistants/([^/,\s]+)', # Alternative URL path (with comma exclusion) + r'calls\?assistantId=([^&,\s]+)' # Specific format (with comma exclusion) + ] + + for pattern in patterns: + match = re.search(pattern, url) + if match: + try: + # Clean the extracted ID: strip spaces and commas + assistant_id = match.group(1).strip().rstrip(',') + log(f"Found assistant ID using pattern {pattern}: {assistant_id}") + + # Validate the cleaned ID matches UUID format + uuid_pattern = r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$' + if re.match(uuid_pattern, assistant_id): + log(f"Validated assistant ID as valid UUID: {assistant_id}") + return assistant_id + else: + log(f"Extracted ID {assistant_id} does not match UUID format") + except IndexError: + # In case the pattern matches but doesn't have a capture group + log(f"Pattern {pattern} matched but no capture group") + continue + + # Last resort - check if the URL contains a UUID pattern + uuid_pattern = r'([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})' + match = re.search(uuid_pattern, url) + if match: + assistant_id = match.group(1).strip() + log(f"Found assistant ID using UUID pattern: {assistant_id}") + return assistant_id + + log(f"No assistant ID found in URL: {url}") + return None + +def find_vapi_tabs() -> List[Tuple[str, str]]: + """ + Find all Brave tabs with VAPI assistant IDs. + + Returns: + List of tuples containing (URL, assistant_id) for each tab + with a VAPI assistant ID + """ + all_tabs = get_brave_tabs() + log(f"Got {len(all_tabs)} tabs from Brave") + for tab in all_tabs: + log(f"Tab URL: {tab}") + + vapi_tabs = [] + + # Look for 'vapi' or 'dashboard' in URLs first + vapi_dashboard_tabs = [url for url in all_tabs if 'vapi' in url.lower() or 'dashboard' in url.lower()] + if vapi_dashboard_tabs: + log(f"Found {len(vapi_dashboard_tabs)} potential VAPI dashboard tabs") + + # Check all tabs, but prioritize VAPI tabs first + priority_tabs = vapi_dashboard_tabs + [url for url in all_tabs if url not in vapi_dashboard_tabs] + + for url in priority_tabs: + assistant_id = extract_assistant_id(url) + if assistant_id: + vapi_tabs.append((url, assistant_id)) + + # Log the results for debugging + if vapi_tabs: + log(f"Found {len(vapi_tabs)} tabs with assistant IDs") + for url, aid in vapi_tabs: + log(f" Tab with assistant ID: {url} -> {aid}") + else: + log("No tabs with assistant IDs found") + + return vapi_tabs + +def copy_to_clipboard(text: str) -> None: + """ + Copy text to the system clipboard. + + Args: + text: The text to copy to clipboard + + Raises: + subprocess.SubprocessError: If the copy operation fails + """ + print(f"Copying {len(text)} characters to clipboard") + + # Check if we're on macOS + if sys.platform == 'darwin': + subprocess.run('pbcopy', input=text.encode('utf-8'), check=True) + elif sys.platform == 'linux': + try: + # Try xclip (Linux) + subprocess.run(['xclip', '-selection', 'clipboard'], input=text.encode('utf-8'), check=True) + except (subprocess.SubprocessError, FileNotFoundError): + try: + # Try xsel (alternative Linux clipboard tool) + subprocess.run(['xsel', '--clipboard', '--input'], input=text.encode('utf-8'), check=True) + except (subprocess.SubprocessError, FileNotFoundError): + # Just print a message for CI environments + print("Clipboard operations not supported in this environment, using mock for testing") + elif sys.platform == 'win32': + # Windows clipboard + try: + import win32clipboard + import win32con + win32clipboard.OpenClipboard() + win32clipboard.EmptyClipboard() + win32clipboard.SetClipboardText(text) + win32clipboard.CloseClipboard() + except ImportError: + # Fall back to using clip.exe + subprocess.run(['clip'], input=text.encode('utf-8'), check=True) + +def paste_from_clipboard() -> bool: + """ + Paste clipboard content at current cursor position using platform-specific methods. + + Returns: + Boolean indicating success or failure + """ + print("Pasting content at cursor position...") + + # Check if we're on macOS + if sys.platform == 'darwin': + script = ''' + tell application "System Events" + keystroke "v" using command down + end tell + ''' + try: + result = subprocess.run(['osascript', '-e', script], + capture_output=True, text=True, check=True) + return True + except subprocess.CalledProcessError as e: + print(f"Error pasting content: {e}") + print(f"Error details: {e.stderr}") + return False + elif sys.platform == 'linux': + # On Linux or CI, just return success for testing purposes + print("Paste operation not supported in this environment, skipping for testing") + return True + elif sys.platform == 'win32': + try: + # Windows using pyautogui + import pyautogui + pyautogui.hotkey('ctrl', 'v') + return True + except ImportError: + print("PyAutoGUI not installed, can't paste on Windows") + return False + + # Default to success for CI testing environments + return True + +def check_venv_setup() -> bool: + """ + Check if virtual environment is properly set up and create it if needed. + + Returns: + Boolean indicating if the virtual environment is set up correctly + """ + log("Checking virtual environment setup") + # Check if venv directory exists + if not os.path.exists(f"{SCRIPT_DIR}/venv"): + log("Virtual environment not found, attempting to create it") + print("Virtual environment not found. Setting up...") + + try: + # Create more detailed debug for venv creation + create_cmd = f"cd {SCRIPT_DIR} && python3 -m venv venv" + log(f"Running venv creation command: {create_cmd}") + + result = subprocess.run(create_cmd, shell=True, capture_output=True, text=True) + + log(f"Venv creation result: exit code {result.returncode}") + if result.stdout: + log(f"Venv creation stdout: {result.stdout}") + if result.stderr: + log(f"Venv creation stderr: {result.stderr}") + + if result.returncode != 0: + log("Failed to create virtual environment") + print("Failed to create virtual environment") + return False + + log("Virtual environment created successfully") + except Exception as e: + log(f"Exception creating venv: {str(e)}") + log(traceback.format_exc()) + print(f"Error creating virtual environment: {e}") + return False + else: + log("Virtual environment exists") + + # Check if VAPI SDK is installed + log("Checking if VAPI SDK is installed") + result = run_with_venv("pip list | grep vapi") + + if "vapi" not in result.stdout: + log("VAPI SDK not found, attempting to install it") + print("Installing VAPI SDK...") + + try: + install_result = run_with_venv("pip install vapi_server_sdk") + + if install_result.returncode != 0: + log(f"Failed to install VAPI SDK: {install_result.stderr}") + print(f"Failed to install VAPI SDK: {install_result.stderr}") + return False + + log("VAPI SDK installed successfully") + except Exception as e: + log(f"Exception installing VAPI SDK: {str(e)}") + log(traceback.format_exc()) + print(f"Error installing VAPI SDK: {e}") + return False + else: + log("VAPI SDK is already installed") + + log("Virtual environment setup complete") + return True + +def check_api_key() -> Optional[str]: + """ + Check if Brave API key is set in .env file. + + Returns: + API key if found, None otherwise + """ + env_path = os.path.join(SCRIPT_DIR, '.env') + api_key = None + + if os.path.exists(env_path): + with open(env_path, 'r') as f: + for line in f: + if line.startswith('VAPI_API_KEY_BRAVE='): + api_key = line.strip().split('=', 1)[1] + break + + if not api_key: + print("VAPI Brave API key not found. Please create a .env file with VAPI_API_KEY_BRAVE=your_api_key") + return None + + return api_key + +def fetch_transcripts(assistant_id: str, api_key: str, + min_duration: int = 0, days_ago: int = None, + limit: int = None, today_only: bool = False) -> Optional[str]: + """ + Fetch all transcripts for the given assistant ID using a Python script executed in the venv. + + Args: + assistant_id: The VAPI assistant ID to fetch transcripts for + api_key: VAPI API key for authentication + min_duration: Minimum duration in seconds for a call to be included + days_ago: Only include calls from the last N days + limit: Maximum number of calls to include + today_only: Only include calls from today + + Returns: + Formatted transcript string or None if an error occurred + """ + log(f"Fetching transcripts for assistant ID: {assistant_id}") + log(f"Filters: min_duration={min_duration}, days_ago={days_ago}, limit={limit}, today_only={today_only}") + + # Create a temporary Python script to execute in the virtual environment + temp_script = os.path.join(SCRIPT_DIR, "_temp_fetch_brave.py") + log(f"Creating temporary script: {temp_script}") + + try: + # Write the script content directly to avoid f-string issues + script_content = ''' +import sys +import traceback +from vapi import Vapi +from datetime import datetime, timedelta + +try: + # Initialize VAPI client + client = Vapi(token="API_KEY_PLACEHOLDER") + + # Fetch calls + response = client.calls.list(assistant_id="ASSISTANT_ID_PLACEHOLDER") + calls = list(response) + + if not calls: + print("No calls found") + sys.exit(0) + + # Sort calls by creation date (oldest first) + # First try to convert to datetime objects for proper sorting + def get_created_date(call): + created_at = getattr(call, "created_at", None) or getattr(call, "createdAt", None) + if hasattr(created_at, 'timestamp'): # It's already a datetime object + return created_at.timestamp() + elif isinstance(created_at, str): + try: + # Try to parse the ISO date string + date_obj = datetime.fromisoformat(created_at.replace('Z', '+00:00')) + return date_obj.timestamp() + except (ValueError, TypeError): + pass + # Fallback to string comparison if date parsing fails + return str(created_at) if created_at else "" + + sorted_calls = sorted(calls, key=get_created_date) + + # Apply filters + filtered_calls = [] + min_duration = MIN_DURATION_PLACEHOLDER # Minimum duration in seconds + days_ago = DAYS_AGO_PLACEHOLDER # Only include calls from the last N days + today_only = TODAY_ONLY_PLACEHOLDER # Only include calls from today + + # Calculate the cutoff date for filtering + now = datetime.now() + if days_ago is not None and days_ago > 0: + cutoff_date = now - timedelta(days=days_ago) + elif today_only: + cutoff_date = datetime(now.year, now.month, now.day, 0, 0, 0) + else: + cutoff_date = None + + for call in sorted_calls: + # Check duration + duration = getattr(call, "duration", 0) or 0 + + # Skip if duration is less than minimum + if duration < min_duration: + continue + + # Check if the call is within the date range + if cutoff_date: + created_at = getattr(call, "created_at", None) or getattr(call, "createdAt", None) + call_datetime = None + + if hasattr(created_at, 'timestamp'): # It's already a datetime object + call_datetime = created_at + elif isinstance(created_at, str): + try: + # Try to parse the ISO date string + call_datetime = datetime.fromisoformat(created_at.replace('Z', '+00:00')) + except (ValueError, TypeError): + pass + + if call_datetime and call_datetime < cutoff_date: + continue + + filtered_calls.append(call) + + # Apply limit if specified + limit = LIMIT_PLACEHOLDER # Maximum number of calls to include + if limit is not None and limit > 0 and len(filtered_calls) > limit: + filtered_calls = filtered_calls[-limit:] # Take the most recent calls + + if not filtered_calls: + print("No calls match the specified filters") + sys.exit(0) + + # Add header with total call count + result = f"# VAPI Call Transcripts ({len(filtered_calls)} total calls)\\n\\n" + result += f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\\n\\n" + + for i, call in enumerate(filtered_calls): + call_id = getattr(call, "id", "unknown") + + # Get the dates (created_at and ended_at) + try: + created_at = getattr(call, "created_at", None) or getattr(call, "createdAt", None) + if hasattr(created_at, 'strftime'): # It's already a datetime object + call_start_date = created_at.strftime('%Y-%m-%d %H:%M:%S') + elif isinstance(created_at, str): + call_start_date = datetime.fromisoformat(created_at.replace('Z', '+00:00')).strftime('%Y-%m-%d %H:%M:%S') + else: + call_start_date = "Unknown date" + except (ValueError, TypeError, AttributeError) as e: + call_start_date = "Unknown date" + + try: + ended_at = getattr(call, "ended_at", None) or getattr(call, "endedAt", None) + if hasattr(ended_at, 'strftime'): # It's already a datetime object + call_end_date = ended_at.strftime('%Y-%m-%d %H:%M:%S') + elif isinstance(ended_at, str): + call_end_date = datetime.fromisoformat(ended_at.replace('Z', '+00:00')).strftime('%Y-%m-%d %H:%M:%S') + else: + call_end_date = "Unknown end time" + except (ValueError, TypeError, AttributeError) as e: + call_end_date = "Unknown end time" + + # Get duration (calculated or from attribute) + duration = getattr(call, "duration", 0) + if not duration and call_start_date != "Unknown date" and call_end_date != "Unknown end time": + try: + # Try to calculate duration + start_dt = datetime.strptime(call_start_date, '%Y-%m-%d %H:%M:%S') + end_dt = datetime.strptime(call_end_date, '%Y-%m-%d %H:%M:%S') + duration = (end_dt - start_dt).total_seconds() + except: + duration = 0 + + duration_str = f" ({duration:.0f}s)" if duration else "" + + # Get additional metadata + status = getattr(call, "status", "") + ended_reason = getattr(call, "ended_reason", "") + call_type = getattr(call, "type", "") + status_info = f" • Status: {status}" if status else "" + reason_info = f" • Ended: {ended_reason}" if ended_reason else "" + type_info = f" • Type: {call_type}" if call_type else "" + + # Format header with metadata + result += f"# Call {i+1} - {call_start_date}{duration_str} (ID: {call_id})\\n" + result += f"Start: {call_start_date} • End: {call_end_date}{status_info}{reason_info}{type_info}\\n\\n" + + # Check for transcript in the artifact first - this is the most reliable source + artifact = getattr(call, "artifact", None) + transcript_found = False + + if artifact: + # Try to get the pre-formatted transcript from the artifact + artifact_transcript = getattr(artifact, "transcript", None) + if artifact_transcript and isinstance(artifact_transcript, str) and artifact_transcript.strip(): + transcript_found = True + # Clean up the transcript format + clean_transcript = artifact_transcript.replace("AI:", "AI: ").replace("User:", "Human: ") + result += clean_transcript + "\\n\\n" + + # If no transcript was found in the artifact, try the messages + if not transcript_found and artifact: + messages = getattr(artifact, "messages", []) + if messages: + transcript_found = True + # Sort messages by time + sorted_messages = sorted(messages, key=lambda x: getattr(x, "time", 0)) + + for message in sorted_messages: + role = getattr(message, "role", "") + if role == "system": + # Skip system messages + continue + + display_role = "AI" if role == "bot" else "Human" + content = getattr(message, "message", "") + if content and content.strip(): + result += f"{display_role}: {content}\\n\\n" + + # If still no transcript, try other approaches + if not transcript_found: + # Try direct transcript attribute if available + transcript = getattr(call, "transcript", None) + if transcript: + transcript_found = True + # Sort transcript entries by creation time + sorted_entries = sorted(transcript, key=lambda x: getattr(x, "createdAt", "")) + + for entry in sorted_entries: + role = "AI" if getattr(entry, "role", "") == "assistant" else "Human" + content = getattr(entry, "content", "") + if content and content.strip(): + result += f"{role}: {content}\\n\\n" + + # Try the messages attribute directly if still no transcript + if not transcript_found: + messages = getattr(call, "messages", []) + if messages: + transcript_found = True + sorted_messages = sorted(messages, key=lambda x: getattr(x, "createdAt", "")) + + for message in sorted_messages: + role = "AI" if getattr(message, "role", "") == "assistant" else "Human" + content = getattr(message, "content", "") + if content and content.strip(): + result += f"{role}: {content}\\n\\n" + + if not transcript_found: + result += "No transcript available for this call\\n\\n" + + result += "---\\n\\n" + + print(result) +except Exception as e: + print(f"Error: {e}", file=sys.stderr) + print("Traceback:", file=sys.stderr) + traceback.print_exc(file=sys.stderr) + sys.exit(1) +''' + + # Replace placeholders with actual values + script_content = script_content.replace("API_KEY_PLACEHOLDER", api_key) + script_content = script_content.replace("ASSISTANT_ID_PLACEHOLDER", assistant_id) + script_content = script_content.replace("MIN_DURATION_PLACEHOLDER", str(min_duration)) + script_content = script_content.replace("DAYS_AGO_PLACEHOLDER", "None" if days_ago is None else str(days_ago)) + script_content = script_content.replace("LIMIT_PLACEHOLDER", "None" if limit is None else str(limit)) + # Use Python's True/False, not JavaScript's true/false + script_content = script_content.replace("TODAY_ONLY_PLACEHOLDER", str(today_only)) + + with open(temp_script, 'w') as f: + f.write(script_content) + + log("Temporary script created") + + # Execute the temporary script with the virtual environment + log("Executing temporary script") + result = run_with_venv(f"python {temp_script}") + + # Clean up + try: + os.remove(temp_script) + log("Temporary script removed") + except Exception as e: + log(f"Error removing temporary script: {str(e)}") + + if result.returncode != 0: + log(f"Error fetching transcripts, exit code: {result.returncode}") + log(f"Error details: {result.stderr}") + print(f"Error fetching transcripts: {result.stderr}") + return None + + log(f"Successfully fetched transcripts: {len(result.stdout)} characters") + return result.stdout + + except Exception as e: + log(f"Exception in fetch_transcripts: {str(e)}") + log(traceback.format_exc()) + print(f"Error preparing transcript fetch: {e}") + + # Ensure temp file is cleaned up + try: + if os.path.exists(temp_script): + os.remove(temp_script) + log("Cleaned up temporary script after exception") + except: + pass + + return None + +def setup_environment() -> Tuple[bool, Optional[str]]: + """ + Set up the virtual environment and check for API key. + + Returns: + Tuple containing: + - Boolean indicating if setup was successful + - API key if found, None otherwise + """ + log("Setting up environment") + + # Check virtual environment + if not check_venv_setup(): + error_msg = "Failed to set up virtual environment. Please set it up manually:" + log(error_msg) + print(error_msg) + setup_cmd = f"cd {SCRIPT_DIR} && python3 -m venv venv && source venv/bin/activate && pip install vapi_server_sdk" + log(f"Setup command: {setup_cmd}") + print(setup_cmd) + return False, None + + # Check API key + api_key = check_api_key() + if not api_key: + log("API key not found") + print("VAPI API key not found. Please create a .env file with VAPI_API_KEY_BRAVE=your_api_key") + return False, None + + log("Environment setup successful") + return True, api_key + +def find_assistant_id() -> Optional[str]: + """ + Find a VAPI assistant ID from Brave tabs. + + Returns: + Assistant ID if found, None otherwise + """ + log("Looking for VAPI assistant tabs") + print("Looking for VAPI assistant tabs in Brave Browser...") + + # First check foreground tab + try: + foreground_url = get_foreground_tab_url() + if foreground_url: + log(f"Foreground tab URL: {foreground_url}") + foreground_assistant_id = extract_assistant_id(foreground_url) + log(f"Extracted assistant ID from foreground tab: {foreground_assistant_id}") + + if foreground_assistant_id: + print(f"Found assistant ID in foreground tab: {foreground_assistant_id}") + return foreground_assistant_id + else: + log("Could not get URL from foreground Brave tab") + print("Could not access foreground Brave tab.") + except Exception as e: + log(f"Error getting foreground tab URL: {str(e)}") + log(traceback.format_exc()) + print(f"Error accessing Brave foreground tab: {str(e)}") + + # Check all tabs if foreground tab doesn't have an assistant ID + try: + vapi_tabs = find_vapi_tabs() + + if vapi_tabs: + log(f"Found {len(vapi_tabs)} VAPI tabs") + + if len(vapi_tabs) > 1: + log(f"Multiple VAPI tabs found: {vapi_tabs}") + print(f"Found {len(vapi_tabs)} VAPI tabs:") + for i, (url, aid) in enumerate(vapi_tabs): + print(f"{i+1}. {aid} ({url})") + print(f"Using the first one: {vapi_tabs[0][1]}") + + return vapi_tabs[0][1] + else: + log("No VAPI assistant tabs found") + print("No VAPI assistant tabs found in Brave. Please open a VAPI dashboard tab in Brave Browser.") + except Exception as e: + log(f"Error finding VAPI tabs: {str(e)}") + log(traceback.format_exc()) + print(f"Error searching all Brave tabs: {e}") + + # No fallback ID available in open source version + log("No VAPI tabs found and no fallback ID available") + print("Error: No VAPI tabs found. Please open a VAPI dashboard tab in Brave Browser.") + return None + +def process_transcripts(assistant_id: str, api_key: str, + output_file: Optional[str] = None, + min_duration: int = 0, + days_ago: Optional[int] = None, + limit: Optional[int] = None, + today_only: bool = False, + no_paste: bool = False) -> bool: + """ + Fetch, process, and output transcripts for a given assistant ID. + + Args: + assistant_id: The VAPI assistant ID + api_key: The VAPI API key + output_file: Path to save transcripts to instead of clipboard + min_duration: Minimum duration in seconds for a call to be included + days_ago: Only include calls from the last N days + limit: Maximum number of calls to include + today_only: Only include calls from today + no_paste: Don't attempt to paste to the current cursor position + + Returns: + Boolean indicating success or failure + """ + # Additional validation and cleaning for assistant_id to prevent API errors + assistant_id = assistant_id.strip().rstrip(',') + # Validate UUID format + uuid_pattern = r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$' + if not re.match(uuid_pattern, assistant_id): + error_msg = f"Invalid assistant ID format: {assistant_id}" + log(error_msg) + print(error_msg) + return False + log(f"Processing transcripts for assistant ID: {assistant_id}") + log(f"Output options: output_file={output_file}, no_paste={no_paste}") + print(f"Fetching transcripts for assistant ID: {assistant_id}") + + # Fetch the transcripts with filters + transcripts = fetch_transcripts( + assistant_id, + api_key, + min_duration=min_duration, + days_ago=days_ago, + limit=limit, + today_only=today_only + ) + log(f"Fetch transcripts result: {'Success' if transcripts else 'Failed'}") + + if not transcripts: + error_msg = "Failed to get transcripts" + log(error_msg) + print(error_msg) + return False + + log(f"Transcript length: {len(transcripts)} characters") + + # Save to file if specified + if output_file: + try: + # Make sure the directory exists + output_dir = os.path.dirname(output_file) + if output_dir and not os.path.exists(output_dir): + os.makedirs(output_dir) + + with open(output_file, 'w', encoding='utf-8') as f: + f.write(transcripts) + + log(f"Successfully saved transcripts to file: {output_file}") + print(f"Transcripts saved to file: {output_file}") + return True + except Exception as e: + log(f"Error saving to file: {str(e)}") + log(traceback.format_exc()) + print(f"Error saving to file: {e}") + return False + + # Otherwise, copy to clipboard and paste + try: + copy_to_clipboard(transcripts) + log("Successfully copied to clipboard") + print("Transcripts copied to clipboard!") + + # Also paste the content at current cursor position if not disabled + if not no_paste: + paste_success = paste_from_clipboard() + if paste_success: + log("Successfully pasted content at cursor position") + else: + log("Failed to paste content at cursor position") + print("Note: Could not automatically paste content") + except Exception as e: + log(f"Error copying to clipboard: {str(e)}") + log(traceback.format_exc()) + print(f"Error copying to clipboard: {e}") + return False + + return True + +def is_raycast_environment() -> bool: + """ + Detect if the script is being run from Raycast. + + Returns: + True if running in Raycast environment, False otherwise + """ + # For this script specifically, always return True if the script name + # includes 'vapi-tools' since it's designed primarily as a Raycast script + script_name = os.path.basename(sys.argv[0]) + if script_name.startswith('vapi_transcripts') or script_name.startswith('vapi-transcripts'): + return True + + # Additional checks for Raycast + # Check if any Raycast-specific environment variables are set + raycast_env_vars = [ + 'RAYCAST_APP_NAME', + 'RAYCAST_EXTENSION_ID', + 'RAYCAST_VERSION' + ] + + for var in raycast_env_vars: + if var in os.environ: + return True + + # Check for specific script execution pattern that seems to be causing issues + if len(sys.argv) > 1 and '--' in sys.argv[1]: + return True + + # Check for Raycast-specific command line flags + raycast_indicators = [ + '--enable-source-maps', + '--npm-global/bin/raycast', + 'node --no-warnings', + 'Script Error' + ] + + cmd_line = ' '.join(sys.argv) + for indicator in raycast_indicators: + if indicator in cmd_line: + return True + + return False + +def parse_args() -> argparse.Namespace: + """ + Parse command-line arguments. + + Returns: + Namespace object containing parsed arguments + """ + # Create a namespace object with default values to use when running from Raycast + default_args = argparse.Namespace() + default_args.assistant_id = None + default_args.output = None + default_args.no_paste = False + default_args.min_duration = 0 # No minimum duration by default + default_args.days = None + default_args.today = False + default_args.limit = None + + # Return default args when running from Raycast + if is_raycast_environment(): + log("Detected Raycast environment, using default arguments") + return default_args + + # Otherwise parse command-line arguments + parser = argparse.ArgumentParser( + description="Extract transcripts from VAPI voice assistant calls (Brave browser)" + ) + + # Assistant ID argument (optional - will use hardcoded ID if not provided) + parser.add_argument( + "-a", "--assistant-id", + help="VAPI assistant ID to fetch transcripts for (if not provided, will use hardcoded ID)" + ) + + # Output options + parser.add_argument( + "-o", "--output", + help="Save transcripts to the specified file instead of clipboard" + ) + + parser.add_argument( + "--no-paste", + action="store_true", + help="Don't paste clipboard content automatically (default is to paste)" + ) + + # Filter options + parser.add_argument( + "-d", "--min-duration", + type=int, + default=0, + help="Minimum call duration in seconds (default: 0, includes all calls)" + ) + + parser.add_argument( + "--days", + type=int, + help="Only include calls from the last N days" + ) + + parser.add_argument( + "--today", + action="store_true", + help="Only include calls from today" + ) + + parser.add_argument( + "-l", "--limit", + type=int, + help="Maximum number of calls to include" + ) + + return parser.parse_args() + + +def main() -> int: + """ + Main entry point function. + + Returns: + Exit code (0 for success, 1 for error) + """ + try: + log("Starting main function") + + # Parse command-line arguments + args = parse_args() + log(f"Command-line arguments: {args}") + + # Setup environment (venv and API key) + setup_success, api_key = setup_environment() + if not setup_success: + return 1 + + # Find or use provided assistant ID + assistant_id = args.assistant_id + if not assistant_id: + log("No assistant ID provided via command line, using hardcoded ID") + assistant_id = find_assistant_id() + + if not assistant_id: + error_msg = "No assistant ID found" + log(error_msg) + print(error_msg) + return 1 + + # Process and output transcripts with filters + if not process_transcripts( + assistant_id, + api_key, + output_file=args.output, + min_duration=args.min_duration, + days_ago=args.days, + limit=args.limit, + today_only=args.today, + no_paste=args.no_paste + ): + return 1 + + log("Execution completed successfully") + return 0 + + except Exception as e: + log(f"Unhandled error in main function: {str(e)}") + log(traceback.format_exc()) + print(f"An error occurred: {e}") + return 1 + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file