From 44beeacd3e33019866f269e6f6d9a881fb99147a Mon Sep 17 00:00:00 2001 From: 01PrathamS Date: Fri, 5 Dec 2025 17:44:22 +0530 Subject: [PATCH 01/10] audio upload extension with gdrive credentials --- backends/advanced/pyproject.toml | 3 + .../src/advanced_omi_backend/app_config.py | 24 +++++ .../routers/modules/audio_routes.py | 14 +++ .../utils/gdrive_audio_utils.py | 87 +++++++++++++++++++ backends/advanced/webui/src/pages/Upload.tsx | 81 +++++++++++++++++ backends/advanced/webui/src/services/api.ts | 11 ++- 6 files changed, 218 insertions(+), 2 deletions(-) create mode 100644 backends/advanced/src/advanced_omi_backend/utils/gdrive_audio_utils.py diff --git a/backends/advanced/pyproject.toml b/backends/advanced/pyproject.toml index 5f635cbb..b021d8d4 100644 --- a/backends/advanced/pyproject.toml +++ b/backends/advanced/pyproject.toml @@ -24,6 +24,9 @@ dependencies = [ "redis>=5.0.0", "rq>=1.16.0", "soundfile>=0.12.1", + "google-api-python-client>=2.0.0", + "google-auth-oauthlib>=1.0.0", + "google-auth-httplib2>=0.2.0", ] [project.optional-dependencies] diff --git a/backends/advanced/src/advanced_omi_backend/app_config.py b/backends/advanced/src/advanced_omi_backend/app_config.py index 4caa70c5..330028a7 100644 --- a/backends/advanced/src/advanced_omi_backend/app_config.py +++ b/backends/advanced/src/advanced_omi_backend/app_config.py @@ -85,6 +85,30 @@ def __init__(self): # Memory service configuration self.memory_service_supports_threshold = self.memory_provider == "friend_lite" + self.gdrive_credentials_path = "data/gdrive_service_account.json" + self.gdrive_scopes = ["https://www.googleapis.com/auth/drive.readonly"] + self._gdrive_service = None + + def get_gdrive_service(self): + """Return Google Drive API client using stored service account.""" + from google.oauth2.service_account import Credentials + from googleapiclient.discovery import build + if self._gdrive_service: + return self._gdrive_service + + if not os.path.exists(self.gdrive_credentials_path): + raise FileNotFoundError( + f"Missing Google Drive credentials at {self.gdrive_credentials_path}" + ) + + creds = Credentials.from_service_account_file( + self.gdrive_credentials_path, + scopes=self.gdrive_scopes, + ) + + self._gdrive_service = build("drive", "v3", credentials=creds) + return self._gdrive_service + # Global configuration instance app_config = AppConfig() diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py index 4c0f756b..4ebdcce2 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py @@ -12,10 +12,24 @@ from advanced_omi_backend.controllers import audio_controller from advanced_omi_backend.models.user import User from advanced_omi_backend.app_config import get_audio_chunk_dir +from advanced_omi_backend.utils.gdrive_audio_utils import download_audio_files_from_drive router = APIRouter(prefix="/audio", tags=["audio"]) +@router.post("/upload_audio_from_url") +async def upload_audio_from_drive_folder( + drive_folder_id: str = Query(...,alias="url", description="Google Drive Folder ID containing audio files (e.g., the string after /folders/ in the URL)"), + current_user: User = Depends(current_superuser), + device_name: str = Query(default="upload"), + auto_generate_client: bool = Query(default=True), +): + files = await download_audio_files_from_drive(drive_folder_id) + + return await audio_controller.upload_and_process_audio_files( + current_user, files, device_name, auto_generate_client + ) + @router.post("/upload") async def upload_audio_files( current_user: User = Depends(current_superuser), diff --git a/backends/advanced/src/advanced_omi_backend/utils/gdrive_audio_utils.py b/backends/advanced/src/advanced_omi_backend/utils/gdrive_audio_utils.py new file mode 100644 index 00000000..d664162e --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/utils/gdrive_audio_utils.py @@ -0,0 +1,87 @@ +import os +import io +import tempfile +from typing import List +from starlette.datastructures import UploadFile as StarletteUploadFile +from googleapiclient.http import MediaIoBaseDownload +from advanced_omi_backend.app_config import get_app_config + +AUDIO_EXTENSIONS = (".wav", ".mp3", ".flac", ".ogg") +FOLDER_MIMETYPE = "application/vnd.google-apps.folder" + + +class AudioValidationError(Exception): + pass + + +# ------------------------------------------------------------- +# DOWNLOAD A SINGLE FILE (OAUTH) +# ------------------------------------------------------------- +async def download_and_wrap_drive_file(service, file_item): + file_id = file_item["id"] + name = file_item["name"] + + request = service.files().get_media(fileId=file_id) + + fh = io.BytesIO() + downloader = MediaIoBaseDownload(fh, request) + + done = False + while not done: + status, done = downloader.next_chunk() + + content = fh.getvalue() + + if not content: + raise AudioValidationError(f"Downloaded Google Drive file '{name}' was empty") + + tmp_file = tempfile.NamedTemporaryFile(delete=False) + tmp_file.write(content) + tmp_file.flush() + + # Wrap in Starlette UploadFile to mimic standard uploads + return StarletteUploadFile( + filename=name, + file=open(tmp_file.name, "rb"), + ) + + +# ------------------------------------------------------------- +# LIST + DOWNLOAD FILES IN FOLDER (OAUTH) +# ------------------------------------------------------------- +async def download_audio_files_from_drive(folder_id: str) -> List[StarletteUploadFile]: + if not folder_id: + raise AudioValidationError("Google Drive folder ID is required.") + + service = get_app_config().get_gdrive_service() + + try: + query = f"'{folder_id}' in parents and trashed = false" + + response = service.files().list( + q=query, + fields="files(id, name, mimeType)", + includeItemsFromAllDrives=False, + supportsAllDrives=False, + ).execute() + + all_files = response.get("files", []) + + audio_files = [ + f for f in all_files + if f["name"].lower().endswith(AUDIO_EXTENSIONS) + ] + + if not audio_files: + raise AudioValidationError("No audio files found in folder.") + + wrapped_files = [] + for item in audio_files: + wrapped_files.append(await download_and_wrap_drive_file(service, item)) + + return wrapped_files + + except Exception as e: + if isinstance(e, AudioValidationError): + raise + raise AudioValidationError(f"Google Drive API Error: {repr(e)}") diff --git a/backends/advanced/webui/src/pages/Upload.tsx b/backends/advanced/webui/src/pages/Upload.tsx index 04e7d24c..ae20b39b 100644 --- a/backends/advanced/webui/src/pages/Upload.tsx +++ b/backends/advanced/webui/src/pages/Upload.tsx @@ -15,11 +15,53 @@ export default function Upload() { const [isUploading, setIsUploading] = useState(false) const [dragActive, setDragActive] = useState(false) const [uploadProgress, setUploadProgress] = useState(0) + const [audioUrl, setAudioUrl] = useState('') const { isAdmin } = useAuth() const generateId = () => Math.random().toString(36).substr(2, 9) + const [urlUploadStatus, setUrlUploadStatus] = useState<{ + type: 'success' | 'error' | null + message: string +}>({ type: null, message: '' }) + + + // Handle URL submission + const handleUrlSubmit = async () => { + if (!audioUrl) return + + setIsUploading(true) + setUrlUploadStatus({ type: null, message: '' }) + + try { + const response = await uploadApi.uploadAudioFromUrl({ + drive_folder_id: audioUrl, + device_name: 'upload', + auto_generate_client: true, + }) + + console.log('URL Upload response:', response) + + setUrlUploadStatus({ + type: 'success', + message: `Audio submitted successfully`, + }) + + setAudioUrl('') + } catch (err: any) { + console.error('URL upload failed:', err) + + setUrlUploadStatus({ + type: 'error', + message: + err?.response?.data?.detail || 'Failed to upload audio from URL', + }) + } finally { + setIsUploading(false) + } +} + const handleFileSelect = (selectedFiles: FileList | null) => { if (!selectedFiles) return @@ -152,6 +194,44 @@ export default function Upload() { Upload Audio Files + {/* URL Input */} +
+ + +
+ setAudioUrl(e.target.value)} + placeholder="https://example.com/audio.wav" + className="flex-1 px-3 py-2 border rounded-lg dark:bg-gray-800 dark:text-gray-100" + /> + + +
+ + {/* ✅ Add the status message here */} + {urlUploadStatus.type && ( +
+ {urlUploadStatus.message} +
+ )} +
+ {/* Drop Zone */}
- {/* URL Input */} + + {/* Google Drive Folder Upload */}
- - -
- setAudioUrl(e.target.value)} - placeholder="https://example.com/audio.wav" - className="flex-1 px-3 py-2 border rounded-lg dark:bg-gray-800 dark:text-gray-100" - /> - - -
- - {/* ✅ Add the status message here */} - {urlUploadStatus.type && ( -
- {urlUploadStatus.message} -
- )} -
+ + +
+ setGdriveFolderId(e.target.value)} + placeholder="1AbCdEfGhIjKlMnOpQrStUvWxYz123456" + className="flex-1 px-3 py-2 border rounded-lg dark:bg-gray-800 dark:text-gray-100" + /> + + +
+ {gdriveUploadStatus.type && ( +
+ {gdriveUploadStatus.message} +
+ )} + {/* Drop Zone */}
Supported formats: WAV, MP3, M4A, FLAC

- + handleFileSelect(e.target.files)} className="absolute inset-0 w-full h-full opacity-0 cursor-pointer" /> - + @@ -279,14 +269,14 @@ export default function Upload() {
@@ -317,19 +307,24 @@ export default function Upload() {
- + {uploadFile.status.charAt(0).toUpperCase() + uploadFile.status.slice(1)} - + {uploadFile.status === 'pending' && ( @@ -371,12 +366,12 @@ export default function Upload() {
  • • Audio files will be processed sequentially for transcription and memory extraction
  • -
  • • Processing time varies based on audio length (roughly 3x the audio duration + 60s)
  • -
  • • Large files or multiple files may cause timeout errors - this is normal
  • -
  • • Check the Conversations tab to see processed results
  • +
  • • Processing time varies based on audio length (roughly 3× duration + 60s)
  • +
  • • Large files or multiple files may cause timeout errors
  • +
  • • Check the Conversations tab for processed results
  • • Supported formats: WAV, MP3, M4A, FLAC
) -} \ No newline at end of file +} diff --git a/backends/advanced/webui/src/services/api.ts b/backends/advanced/webui/src/services/api.ts index 3c16f173..86bf655d 100644 --- a/backends/advanced/webui/src/services/api.ts +++ b/backends/advanced/webui/src/services/api.ts @@ -191,12 +191,10 @@ export const uploadApi = { }, }), -uploadAudioFromUrl: (payload: { drive_folder_id: string; device_name?: string; auto_generate_client?: boolean }) => - // 1. Set the POST body to null (or leave it out, though explicit null is cleaner for no body) - api.post('/api/audio/upload_audio_from_url', null, { - // 2. Pass the entire payload object to the 'params' configuration key - params: { - url: payload.drive_folder_id, // IMPORTANT: Use 'url' here to match the backend's alias + uploadFromGDriveFolder: (payload: { gdrive_folder_id: string; device_name?: string; auto_generate_client?: boolean }) => + api.post('/api/audio/upload_audio_from_gdrive', null, { + params: { + gdrive_folder_id: payload.gdrive_folder_id, device_name: payload.device_name, auto_generate_client: payload.auto_generate_client, }, From 6534288efae767d48947009950bab924fe8eac26 Mon Sep 17 00:00:00 2001 From: 01PrathamS Date: Wed, 10 Dec 2025 12:23:19 +0530 Subject: [PATCH 08/10] REFACTOR: validation updated - as per review from CR --- .../advanced_omi_backend/routers/modules/audio_routes.py | 7 +++++-- .../src/advanced_omi_backend/utils/gdrive_audio_utils.py | 9 +++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py index e8ba091f..6da6f9b2 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py @@ -12,7 +12,7 @@ from advanced_omi_backend.controllers import audio_controller from advanced_omi_backend.models.user import User from advanced_omi_backend.app_config import get_audio_chunk_dir -from advanced_omi_backend.utils.gdrive_audio_utils import download_audio_files_from_drive +from advanced_omi_backend.utils.gdrive_audio_utils import download_audio_files_from_drive, AudioValidationError router = APIRouter(prefix="/audio", tags=["audio"]) @@ -24,7 +24,10 @@ async def upload_audio_from_drive_folder( device_name: str = Query(default="upload"), auto_generate_client: bool = Query(default=True), ): - files = await download_audio_files_from_drive(gdrive_folder_id) + try: + files = await download_audio_files_from_drive(gdrive_folder_id) + except AudioValidationError as e: + raise HTTPException(status_code=400, detail=str(e)) return await audio_controller.upload_and_process_audio_files( current_user, files, device_name, auto_generate_client diff --git a/backends/advanced/src/advanced_omi_backend/utils/gdrive_audio_utils.py b/backends/advanced/src/advanced_omi_backend/utils/gdrive_audio_utils.py index 2b6efd8d..09c3da17 100644 --- a/backends/advanced/src/advanced_omi_backend/utils/gdrive_audio_utils.py +++ b/backends/advanced/src/advanced_omi_backend/utils/gdrive_audio_utils.py @@ -4,8 +4,9 @@ import logging from starlette.datastructures import UploadFile as StarletteUploadFile from googleapiclient.http import MediaIoBaseDownload -from advanced_omi_backend.app_config import get_app_config from advanced_omi_backend.clients.gdrive_audio_client import get_google_drive_client +from advanced_omi_backend.models.audio_file import AudioFile +from advanced_omi_backend.utils.audio_utils import AudioValidationError logger = logging.getLogger(__name__) @@ -15,9 +16,6 @@ FOLDER_MIMETYPE = "application/vnd.google-apps.folder" -class AudioValidationError(Exception): - pass - async def download_and_wrap_drive_file(service, file_item): file_id = file_item["id"] @@ -100,7 +98,7 @@ async def download_audio_files_from_drive(folder_id: str) -> List[StarletteUploa # synchronous call now (but make the parent function async) wrapped_file = await download_and_wrap_drive_file(service, item) # Attach the file_id to the UploadFile object for later use - setattr(wrapped_file, "gdrive_file_id", file_id) + wrapped_file.gdrive_file_id = file_id wrapped_files.append(wrapped_file) if not wrapped_files and skipped_count > 0: @@ -118,7 +116,6 @@ async def is_drive_file_already_processed(gdrive_file_id: str) -> bool: """Check if an AudioFile document already exists for the given GDrive File ID.""" if not gdrive_file_id: return False - from advanced_omi_backend.models.audio_file import AudioFile existing_file = await AudioFile.find_one( AudioFile.gdrive_file_id == gdrive_file_id ) From 1ff28cbcc4f83c18704feaff10f2e5636d643a70 Mon Sep 17 00:00:00 2001 From: 01PrathamS Date: Mon, 15 Dec 2025 16:32:51 +0530 Subject: [PATCH 09/10] UPDATE: code has been refactore for UUID for diffrent audio upload sources --- .../controllers/audio_controller.py | 9 +- .../advanced_omi_backend/models/audio_file.py | 6 +- .../routers/modules/audio_routes.py | 2 +- .../advanced_omi_backend/utils/audio_utils.py | 4 +- .../utils/dropbox_audio_utils.py | 150 ++++++++++++++++++ .../utils/gdrive_audio_utils.py | 19 +-- 6 files changed, 171 insertions(+), 19 deletions(-) create mode 100644 backends/advanced/src/advanced_omi_backend/utils/dropbox_audio_utils.py diff --git a/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py index 2f1d5513..06492ff2 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py @@ -40,6 +40,7 @@ async def upload_and_process_audio_files( device_name: str = "upload", auto_generate_client: bool = True, folder: str = None, + source: str = "upload" ) -> dict: """ Upload audio files and process them directly. @@ -81,10 +82,12 @@ async def upload_and_process_audio_files( # Read file content content = await file.read() - gdrive_file_id = getattr(file, "gdrive_file_id", None) # Generate audio UUID and timestamp - audio_uuid = str(uuid.uuid4()) + if source == "gdrive": + audio_uuid = getattr(file, "audio_uuid", None) + else: + audio_uuid = str(uuid.uuid4()) timestamp = int(time.time() * 1000) # Determine output directory (with optional subfolder) @@ -100,13 +103,13 @@ async def upload_and_process_audio_files( relative_audio_path, file_path, duration = await write_audio_file( raw_audio_data=content, audio_uuid=audio_uuid, + source=source, client_id=client_id, user_id=user.user_id, user_email=user.email, timestamp=timestamp, chunk_dir=chunk_dir, validate=True, # Validate WAV format, convert stereo→mono - gdrive_file_id=gdrive_file_id ) except AudioValidationError as e: processed_files.append({ diff --git a/backends/advanced/src/advanced_omi_backend/models/audio_file.py b/backends/advanced/src/advanced_omi_backend/models/audio_file.py index 6d2efa4c..3e8cc4b3 100644 --- a/backends/advanced/src/advanced_omi_backend/models/audio_file.py +++ b/backends/advanced/src/advanced_omi_backend/models/audio_file.py @@ -29,6 +29,10 @@ class AudioFile(Document): # Core identifiers audio_uuid: Indexed(str, unique=True) = Field(description="Unique audio identifier") + source: Indexed(str) = Field( + default="upload", + description="Source of the audio (upload, gdrive, etc.)" + ) audio_path: str = Field(description="Path to raw audio file") client_id: Indexed(str) = Field(description="Client device identifier") timestamp: Indexed(int) = Field(description="Unix timestamp in milliseconds") @@ -51,7 +55,6 @@ class AudioFile(Document): description="Speech detection results" ) - gdrive_file_id: Optional[str] = Field(default=None, description="Google Drive file id") class Settings: @@ -61,5 +64,4 @@ class Settings: "client_id", "user_id", "timestamp", - "gdrive_file_id" ] \ No newline at end of file diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py index 6da6f9b2..056e7667 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py @@ -30,7 +30,7 @@ async def upload_audio_from_drive_folder( raise HTTPException(status_code=400, detail=str(e)) return await audio_controller.upload_and_process_audio_files( - current_user, files, device_name, auto_generate_client + current_user, files, device_name, auto_generate_client, source="gdrive" ) diff --git a/backends/advanced/src/advanced_omi_backend/utils/audio_utils.py b/backends/advanced/src/advanced_omi_backend/utils/audio_utils.py index 84e4e47d..3a3b554d 100644 --- a/backends/advanced/src/advanced_omi_backend/utils/audio_utils.py +++ b/backends/advanced/src/advanced_omi_backend/utils/audio_utils.py @@ -102,13 +102,13 @@ async def validate_and_prepare_audio( async def write_audio_file( raw_audio_data: bytes, audio_uuid: str, + source: str, client_id: str, user_id: str, user_email: str, timestamp: int, chunk_dir: Optional[Path] = None, validate: bool = True, - gdrive_file_id: Optional[str] = None, ) -> tuple[str, str, float]: """ Validate, write audio data to WAV file, and create AudioSession database entry. @@ -198,6 +198,7 @@ async def write_audio_file( # Create AudioFile database entry using Beanie model audio_file = AudioFile( audio_uuid=audio_uuid, + source=source, audio_path=wav_filename, client_id=client_id, timestamp=timestamp, @@ -205,7 +206,6 @@ async def write_audio_file( user_email=user_email, has_speech=False, # Will be updated by transcription speech_analysis={}, - gdrive_file_id=gdrive_file_id ) await audio_file.insert() diff --git a/backends/advanced/src/advanced_omi_backend/utils/dropbox_audio_utils.py b/backends/advanced/src/advanced_omi_backend/utils/dropbox_audio_utils.py new file mode 100644 index 00000000..51de2c1b --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/utils/dropbox_audio_utils.py @@ -0,0 +1,150 @@ +import io +import tempfile +from typing import List +import logging +from starlette.datastructures import UploadFile as StarletteUploadFile +import requests +from advanced_omi_backend.app_config import get_app_config + +logger = logging.getLogger(__name__) +audio_logger = logging.getLogger("audio_processing") + +AUDIO_EXTENSIONS = (".wav", ".mp3", ".flac", ".ogg", ".m4a") + +class AudioValidationError(Exception): + pass + + +async def download_and_wrap_dropbox_file(file_metadata: dict): + access_token = get_app_config().dropbox_access_token + if not access_token: + raise AudioValidationError("Dropbox access token is missing.") + + file_path = file_metadata["path_lower"] + name = file_metadata["name"] + + headers = { + "Authorization": f"Bearer {access_token}", + "Dropbox-API-Arg": f'{{"path": "{file_path}"}}' + } + + response = requests.post( + "https://content.dropboxapi.com/2/files/download", + headers=headers + ) + + if response.status_code != 200: + raise AudioValidationError( + f"Failed to download Dropbox file {name}: {response.text}" + ) + + content = response.content + if not content: + raise AudioValidationError(f"Downloaded Dropbox file '{name}' was empty") + + tmp_file = tempfile.SpooledTemporaryFile(max_size=10 * 1024 * 1024) + tmp_file.write(content) + tmp_file.seek(0) + + upload_file = StarletteUploadFile(filename=name, file=tmp_file) + + original_close = upload_file.close + + def wrapped_close(): + try: + original_close() + finally: + pass + + upload_file.close = wrapped_close + return upload_file + + +async def download_audio_files_from_dropbox(folder_path: str) -> List[StarletteUploadFile]: + if not folder_path: + raise AudioValidationError("Dropbox folder path is required.") + + access_token = get_app_config().dropbox_access_token + if not access_token: + raise AudioValidationError("Dropbox access token is missing.") + + try: + # ----------------------------------------------- + # Step 1: List files in folder + # ----------------------------------------------- + list_headers = { + "Authorization": f"Bearer {access_token}", + "Content-Type": "application/json" + } + + list_body = { + "path": folder_path, + "recursive": False + } + + list_res = requests.post( + "https://api.dropboxapi.com/2/files/list_folder", + headers=list_headers, + json=list_body + ) + + if list_res.status_code != 200: + raise AudioValidationError(f"Dropbox API list_folder error: {list_res.text}") + + items = list_res.json().get("entries", []) + + # Filter audio files + audio_files_metadata = [ + f for f in items + if f[".tag"] == "file" and f["name"].lower().endswith(AUDIO_EXTENSIONS) + ] + + if not audio_files_metadata: + raise AudioValidationError("No audio files found in folder.") + + wrapped_files = [] + skipped_count = 0 + + # ----------------------------------------------- + # Step 2: Download each audio file + # ----------------------------------------------- + for item in audio_files_metadata: + dropbox_id = item["id"] # Dropbox file ID + + # Check if already processed + if await is_dropbox_file_already_processed(dropbox_id): + audio_logger.info(f"Skipping already processed file: {item['name']}") + skipped_count += 1 + continue + + wrapped_file = await download_and_wrap_dropbox_file(access_token, item) + + # Attach Dropbox file ID + setattr(wrapped_file, "dropbox_file_id", dropbox_id) + + wrapped_files.append(wrapped_file) + + if not wrapped_files and skipped_count > 0: + raise AudioValidationError( + f"All {skipped_count} files in the folder have already been processed." + ) + + return wrapped_files + + except Exception as e: + if isinstance(e, AudioValidationError): + raise + raise AudioValidationError(f"Dropbox API Error: {e}") from e + + +async def is_dropbox_file_already_processed(dropbox_file_id: str) -> bool: + if not dropbox_file_id: + return False + + from advanced_omi_backend.models.audio_file import AudioFile + + existing_file = await AudioFile.find_one( + AudioFile.dropbox_file_id == dropbox_file_id + ) + + return existing_file is not None diff --git a/backends/advanced/src/advanced_omi_backend/utils/gdrive_audio_utils.py b/backends/advanced/src/advanced_omi_backend/utils/gdrive_audio_utils.py index 09c3da17..46b0806d 100644 --- a/backends/advanced/src/advanced_omi_backend/utils/gdrive_audio_utils.py +++ b/backends/advanced/src/advanced_omi_backend/utils/gdrive_audio_utils.py @@ -90,15 +90,20 @@ async def download_audio_files_from_drive(folder_id: str) -> List[StarletteUploa file_id = item["id"] # Get the Google Drive File ID # Check if the file is already processed - if await is_drive_file_already_processed(file_id): - audio_logger.info(f"Skipping already processed file: {item['name']}") # Use your logger + existing = await AudioFile.find_one({ + "audio_uuid": file_id, + "source": "gdrive" + }) + + if existing: + audio_logger.info(f"Skipping already processed file: {item['name']}") skipped_count += 1 continue # synchronous call now (but make the parent function async) wrapped_file = await download_and_wrap_drive_file(service, item) # Attach the file_id to the UploadFile object for later use - wrapped_file.gdrive_file_id = file_id + wrapped_file.audio_uuid = file_id wrapped_files.append(wrapped_file) if not wrapped_files and skipped_count > 0: @@ -112,11 +117,3 @@ async def download_audio_files_from_drive(folder_id: str) -> List[StarletteUploa raise AudioValidationError(f"Google Drive API Error: {e}") from e -async def is_drive_file_already_processed(gdrive_file_id: str) -> bool: - """Check if an AudioFile document already exists for the given GDrive File ID.""" - if not gdrive_file_id: - return False - existing_file = await AudioFile.find_one( - AudioFile.gdrive_file_id == gdrive_file_id - ) - return existing_file is not None \ No newline at end of file From 8e5a6b297ba5ba5e7391b00d3dea66052bb38cab Mon Sep 17 00:00:00 2001 From: 01PrathamS Date: Mon, 15 Dec 2025 16:47:47 +0530 Subject: [PATCH 10/10] REFACTOR: updated code as per review --- .../controllers/audio_controller.py | 5 +- .../utils/dropbox_audio_utils.py | 150 ------------------ 2 files changed, 4 insertions(+), 151 deletions(-) delete mode 100644 backends/advanced/src/advanced_omi_backend/utils/dropbox_audio_utils.py diff --git a/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py index 06492ff2..4810810d 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py @@ -86,6 +86,9 @@ async def upload_and_process_audio_files( # Generate audio UUID and timestamp if source == "gdrive": audio_uuid = getattr(file, "audio_uuid", None) + if not audio_uuid: + audio_logger.error(f"Missing audio_uuid for gdrive file: {file.filename}") + audio_uuid = str(uuid.uuid4()) else: audio_uuid = str(uuid.uuid4()) timestamp = int(time.time() * 1000) @@ -139,7 +142,7 @@ async def upload_and_process_audio_files( # Use the relative path returned by write_audio_file (already includes folder prefix if applicable) conversation.audio_path = relative_audio_path await conversation.insert() - conversation_id = conversation.conversation_id # Get the auto-gener ated ID + conversation_id = conversation.conversation_id # Get the auto-generated ID audio_logger.info(f"📝 Created conversation {conversation_id} for uploaded file") diff --git a/backends/advanced/src/advanced_omi_backend/utils/dropbox_audio_utils.py b/backends/advanced/src/advanced_omi_backend/utils/dropbox_audio_utils.py deleted file mode 100644 index 51de2c1b..00000000 --- a/backends/advanced/src/advanced_omi_backend/utils/dropbox_audio_utils.py +++ /dev/null @@ -1,150 +0,0 @@ -import io -import tempfile -from typing import List -import logging -from starlette.datastructures import UploadFile as StarletteUploadFile -import requests -from advanced_omi_backend.app_config import get_app_config - -logger = logging.getLogger(__name__) -audio_logger = logging.getLogger("audio_processing") - -AUDIO_EXTENSIONS = (".wav", ".mp3", ".flac", ".ogg", ".m4a") - -class AudioValidationError(Exception): - pass - - -async def download_and_wrap_dropbox_file(file_metadata: dict): - access_token = get_app_config().dropbox_access_token - if not access_token: - raise AudioValidationError("Dropbox access token is missing.") - - file_path = file_metadata["path_lower"] - name = file_metadata["name"] - - headers = { - "Authorization": f"Bearer {access_token}", - "Dropbox-API-Arg": f'{{"path": "{file_path}"}}' - } - - response = requests.post( - "https://content.dropboxapi.com/2/files/download", - headers=headers - ) - - if response.status_code != 200: - raise AudioValidationError( - f"Failed to download Dropbox file {name}: {response.text}" - ) - - content = response.content - if not content: - raise AudioValidationError(f"Downloaded Dropbox file '{name}' was empty") - - tmp_file = tempfile.SpooledTemporaryFile(max_size=10 * 1024 * 1024) - tmp_file.write(content) - tmp_file.seek(0) - - upload_file = StarletteUploadFile(filename=name, file=tmp_file) - - original_close = upload_file.close - - def wrapped_close(): - try: - original_close() - finally: - pass - - upload_file.close = wrapped_close - return upload_file - - -async def download_audio_files_from_dropbox(folder_path: str) -> List[StarletteUploadFile]: - if not folder_path: - raise AudioValidationError("Dropbox folder path is required.") - - access_token = get_app_config().dropbox_access_token - if not access_token: - raise AudioValidationError("Dropbox access token is missing.") - - try: - # ----------------------------------------------- - # Step 1: List files in folder - # ----------------------------------------------- - list_headers = { - "Authorization": f"Bearer {access_token}", - "Content-Type": "application/json" - } - - list_body = { - "path": folder_path, - "recursive": False - } - - list_res = requests.post( - "https://api.dropboxapi.com/2/files/list_folder", - headers=list_headers, - json=list_body - ) - - if list_res.status_code != 200: - raise AudioValidationError(f"Dropbox API list_folder error: {list_res.text}") - - items = list_res.json().get("entries", []) - - # Filter audio files - audio_files_metadata = [ - f for f in items - if f[".tag"] == "file" and f["name"].lower().endswith(AUDIO_EXTENSIONS) - ] - - if not audio_files_metadata: - raise AudioValidationError("No audio files found in folder.") - - wrapped_files = [] - skipped_count = 0 - - # ----------------------------------------------- - # Step 2: Download each audio file - # ----------------------------------------------- - for item in audio_files_metadata: - dropbox_id = item["id"] # Dropbox file ID - - # Check if already processed - if await is_dropbox_file_already_processed(dropbox_id): - audio_logger.info(f"Skipping already processed file: {item['name']}") - skipped_count += 1 - continue - - wrapped_file = await download_and_wrap_dropbox_file(access_token, item) - - # Attach Dropbox file ID - setattr(wrapped_file, "dropbox_file_id", dropbox_id) - - wrapped_files.append(wrapped_file) - - if not wrapped_files and skipped_count > 0: - raise AudioValidationError( - f"All {skipped_count} files in the folder have already been processed." - ) - - return wrapped_files - - except Exception as e: - if isinstance(e, AudioValidationError): - raise - raise AudioValidationError(f"Dropbox API Error: {e}") from e - - -async def is_dropbox_file_already_processed(dropbox_file_id: str) -> bool: - if not dropbox_file_id: - return False - - from advanced_omi_backend.models.audio_file import AudioFile - - existing_file = await AudioFile.find_one( - AudioFile.dropbox_file_id == dropbox_file_id - ) - - return existing_file is not None