diff --git a/.codacy.yml b/.codacy.yml
index b00c8590..c338c01d 100644
--- a/.codacy.yml
+++ b/.codacy.yml
@@ -2,6 +2,14 @@ exclude_paths:
- "docs/plans/**"
- "scripts/**"
- "apps/api/tests/**"
+ - "apps/web/src/*.test.ts"
+ - "apps/web/src/*.test.tsx"
+ - "apps/web/src/**/*.test.ts"
+ - "apps/web/src/**/*.test.tsx"
- "apps/web/e2e/**"
+ - "apps/desktop/src/*.test.ts"
+ - "apps/desktop/src/**/*.test.ts"
+ - "services/worker/test_*.py"
+ - "apps/desktop/src/styles.css"
- "apps/desktop/src/text.ts"
- "apps/desktop/src/text.test.ts"
diff --git a/.github/workflows/codecov-analytics.yml b/.github/workflows/codecov-analytics.yml
index 0c91594a..6fe97f72 100644
--- a/.github/workflows/codecov-analytics.yml
+++ b/.github/workflows/codecov-analytics.yml
@@ -27,6 +27,11 @@ jobs:
exit 1
fi
+ - name: Install system dependencies for desktop Rust coverage
+ run: |
+ sudo apt-get update
+ sudo apt-get install -y libgtk-3-dev libwebkit2gtk-4.1-dev librsvg2-dev patchelf
+
- name: Set up Python
uses: actions/setup-python@v6
with:
@@ -37,6 +42,9 @@ jobs:
with:
node-version: '20'
+ - name: Set up Rust
+ uses: dtolnay/rust-toolchain@631a55b12751854ce901bb631d5902ceb48146f7
+
- name: Install Python deps
run: |
python -m venv .venv
@@ -59,32 +67,59 @@ jobs:
--cov=apps/api/app \
--cov=services/worker \
--cov=packages/media-core/src/media_core \
+ --cov=scripts \
--cov-report=xml:coverage/python-coverage.xml \
apps/api/tests services/worker packages/media-core/tests
- name: Run web coverage
working-directory: apps/web
- run: |
- npx vitest run --coverage \
- --coverage.thresholds.lines=0 \
- --coverage.thresholds.functions=0 \
- --coverage.thresholds.branches=0 \
- --coverage.thresholds.statements=0
+ run: npm run test:coverage
- name: Run desktop TS coverage
working-directory: apps/desktop
+ run: npm run test:coverage
+
+ - name: Install cargo-llvm-cov
+ run: cargo install cargo-llvm-cov --locked
+
+ - name: Run desktop Rust coverage
run: |
- npx vitest run --coverage \
- --coverage.thresholds.lines=0 \
- --coverage.thresholds.functions=0 \
- --coverage.thresholds.branches=0 \
- --coverage.thresholds.statements=0
+ mkdir -p coverage
+ cd apps/desktop/src-tauri
+ cargo llvm-cov --workspace --all-features --lcov --output-path ../../../coverage/desktop-rust.lcov
+
+ - name: Upload Python coverage to Codecov
+ uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de
+ with:
+ token: ${{ secrets.CODECOV_TOKEN }}
+ files: coverage/python-coverage.xml
+ flags: api,worker,media-core,scripts
+ fail_ci_if_error: true
+ verbose: true
+
+ - name: Upload web coverage to Codecov
+ uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de
+ with:
+ token: ${{ secrets.CODECOV_TOKEN }}
+ files: apps/web/coverage/lcov.info
+ flags: web
+ fail_ci_if_error: true
+ verbose: true
+
+ - name: Upload desktop TS coverage to Codecov
+ uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de
+ with:
+ token: ${{ secrets.CODECOV_TOKEN }}
+ files: apps/desktop/coverage/lcov.info
+ flags: desktop-ts
+ fail_ci_if_error: true
+ verbose: true
- - name: Upload coverage to Codecov
+ - name: Upload desktop Rust coverage to Codecov
uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de
with:
token: ${{ secrets.CODECOV_TOKEN }}
- files: coverage/python-coverage.xml,apps/web/coverage/lcov.info,apps/desktop/coverage/lcov.info
- flags: api,worker,media-core,web,desktop-ts
+ files: coverage/desktop-rust.lcov
+ flags: desktop-rust
fail_ci_if_error: true
verbose: true
diff --git a/.github/workflows/coverage-100.yml b/.github/workflows/coverage-100.yml
index e589a0bb..f0c78e72 100644
--- a/.github/workflows/coverage-100.yml
+++ b/.github/workflows/coverage-100.yml
@@ -59,6 +59,7 @@ jobs:
--cov=apps/api/app \
--cov=services/worker \
--cov=packages/media-core/src/media_core \
+ --cov=scripts \
--cov-report=xml:coverage/python-coverage.xml \
apps/api/tests services/worker packages/media-core/tests
@@ -82,8 +83,11 @@ jobs:
- name: Enforce 100% coverage
run: |
.venv/bin/python scripts/quality/assert_coverage_100.py \
+ --xml "python=coverage/python-coverage.xml" \
--lcov "web=apps/web/coverage/lcov.info" \
--lcov "desktop-ts=apps/desktop/coverage/lcov.info" \
+ --lcov "desktop-rust=coverage/desktop-rust.lcov" \
+ --inventory-root . \
--out-json "coverage-100/coverage.json" \
--out-md "coverage-100/coverage.md"
diff --git a/.github/workflows/desktop-release.yml b/.github/workflows/desktop-release.yml
index dd5a0dd7..4f63dcbb 100644
--- a/.github/workflows/desktop-release.yml
+++ b/.github/workflows/desktop-release.yml
@@ -36,7 +36,9 @@ jobs:
with:
node-version: "20"
cache: "npm"
- cache-dependency-path: apps/desktop/package-lock.json
+ cache-dependency-path: |
+ apps/desktop/package-lock.json
+ apps/web/package-lock.json
- name: Install Rust stable
uses: dtolnay/rust-toolchain@631a55b12751854ce901bb631d5902ceb48146f7
@@ -50,6 +52,14 @@ jobs:
sudo apt-get install -y libwebkit2gtk-4.1-dev librsvg2-dev patchelf
sudo apt-get install -y libappindicator3-dev || sudo apt-get install -y libayatana-appindicator3-dev
+ - name: Install web dependencies
+ working-directory: apps/web
+ run: npm ci
+
+ - name: Build hosted web dist for desktop runtime bundle
+ working-directory: apps/web
+ run: npm run build
+
- name: Install desktop dependencies
working-directory: apps/desktop
run: npm ci
diff --git a/apps/api/app/api.py b/apps/api/app/api.py
index bf88a331..2f7de915 100644
--- a/apps/api/app/api.py
+++ b/apps/api/app/api.py
@@ -11,28 +11,25 @@
from datetime import datetime, timedelta, timezone
from functools import lru_cache
from pathlib import Path
-from typing import Annotated, Any, List, Optional
-from uuid import uuid4
-
-try:
- from celery import Celery
-except ModuleNotFoundError: # pragma: no cover - allows API tests without optional celery install
- class Celery: # type: ignore[override]
- def __init__(self, *args, **kwargs):
- pass
-
- def send_task(self, *_args, **_kwargs):
- raise RuntimeError("Celery is not installed in this environment.")
-from fastapi import APIRouter, Depends, File, Form, Header, Query, Request, UploadFile, status, Response
-from uuid import UUID
+from typing import Any, Iterable, List, Optional, Set
+from uuid import UUID, uuid4
+from fastapi import APIRouter, Depends, File, Form, Header, Query, Request, Response, UploadFile, status
+from fastapi.responses import FileResponse, StreamingResponse
from sqlmodel import Field, Session, SQLModel, select
+from typing_extensions import Annotated
from app.auth_api import PrincipalDep, ensure_default_plans
from app.billing import get_plan_policy
-from app.database import get_session
from app.config import get_settings
+from app.database import get_session
from app.errors import ApiError, ErrorCode, ErrorResponse, conflict, not_found, quota_exceeded, server_error, unauthorized
+from app.local_queue import (
+ diagnostics as local_queue_diagnostics,
+ dispatch_task as dispatch_local_task,
+ is_local_queue_mode,
+ revoke_task as revoke_local_task,
+)
from app.models import (
Job,
JobStatus,
@@ -52,11 +49,76 @@ def send_task(self, *_args, **_kwargs):
)
from app.rate_limit import enforce_rate_limit
from app.security import AuthPrincipal
-from fastapi.responses import FileResponse, StreamingResponse
-
from app.share_links import build_share_token_with_ttl, parse_and_validate_share_token
from app.storage import LocalStorageBackend, get_storage, is_remote_uri
+try:
+ from celery import Celery as _RealCelery
+except ModuleNotFoundError: # pragma: no cover - allows API tests without optional celery install
+ _RealCelery = None
+
+
+_MISSING_CELERY_MESSAGE = "Celery is not installed in this environment."
+
+
+class _MissingCeleryControl:
+ @staticmethod
+ def ping(*_args, **_kwargs):
+ raise RuntimeError(_MISSING_CELERY_MESSAGE)
+
+ @staticmethod
+ def revoke(*_args, **_kwargs):
+ raise RuntimeError(_MISSING_CELERY_MESSAGE)
+
+
+class _MissingCelery:
+ def __init__(self, *args, **kwargs):
+ self.control = _MissingCeleryControl()
+
+ @staticmethod
+ def send_task(*_args, **_kwargs):
+ raise RuntimeError(_MISSING_CELERY_MESSAGE)
+
+
+Celery = _RealCelery or _MissingCelery
+try:
+ from kombu.exceptions import OperationalError as _KombuOperationalError
+except ModuleNotFoundError: # pragma: no cover - optional dependency
+ _KombuOperationalError = RuntimeError
+
+try:
+ from redis.exceptions import ConnectionError as _RedisConnectionError
+except ModuleNotFoundError: # pragma: no cover - optional dependency
+ _RedisConnectionError = ConnectionError
+
+KombuOperationalError = _KombuOperationalError
+RedisConnectionError = _RedisConnectionError
+
+_CELERY_BOOTSTRAP_EXCEPTIONS = (
+ RuntimeError,
+ ValueError,
+ TypeError,
+ AttributeError,
+ OSError,
+ ImportError,
+ ModuleNotFoundError,
+ KombuOperationalError,
+ RedisConnectionError,
+ ConnectionError,
+)
+
+_CELERY_RUNTIME_EXCEPTIONS = (
+ RuntimeError,
+ TimeoutError,
+ ValueError,
+ TypeError,
+ AttributeError,
+ OSError,
+ KombuOperationalError,
+ RedisConnectionError,
+ ConnectionError,
+)
+
router = APIRouter(prefix="/api/v1")
logger = logging.getLogger("reframe.api")
_DEFAULT_BINARY_MEDIA_TYPE = "application/octet-stream"
@@ -66,9 +128,9 @@ def send_task(self, *_args, **_kwargs):
@lru_cache(maxsize=1)
-def get_celery_app() -> Celery:
+def get_celery_app() -> Any:
settings = get_settings()
- app = Celery("reframe_api", broker=settings.broker_url, backend=settings.result_backend)
+ app: Any = Celery("reframe_api", broker=settings.broker_url, backend=settings.result_backend)
# Fail fast when broker/backend are unavailable so API diagnostics and tests do not hang.
app.conf.broker_connection_retry_on_startup = False
app.conf.broker_connection_max_retries = 0
@@ -126,6 +188,8 @@ def _resolve_task_queue(task_name: str, *args) -> str:
def enqueue_job(job: Job, task_name: str, *args) -> str:
try:
queue = _resolve_task_queue(task_name, *args)
+ if is_local_queue_mode():
+ return dispatch_local_task(task_name, *args, queue=queue)
result = get_celery_app().send_task(task_name, args=args, queue=queue)
return result.id
except Exception as exc: # pragma: no cover - defensive
@@ -221,7 +285,15 @@ def _safe_redirect_url(url: str) -> str:
def _safe_local_asset_path(*, media_root: str, uri: str) -> Path:
media_root_path = Path(media_root).resolve()
- candidate = LocalStorageBackend(media_root=media_root_path).resolve_local_path(uri or "")
+ try:
+ candidate = LocalStorageBackend(media_root=media_root_path).resolve_local_path(uri or "")
+ except ValueError as exc:
+ raise ApiError(
+ status_code=status.HTTP_403_FORBIDDEN,
+ code=ErrorCode.PERMISSION_DENIED,
+ message="Asset path escapes media root",
+ details={"uri": uri},
+ ) from exc
resolved = candidate.resolve(strict=False)
try:
resolved.relative_to(media_root_path)
@@ -627,34 +699,70 @@ def _truthy_env(name: str) -> bool:
return raw in {"1", "true", "yes", "on"}
+def _append_diag_error(existing: str | None, message: str) -> str:
+ return f"{existing}; {message}" if existing else message
+
+
+def _populate_worker_diag_local_queue(worker_diag: WorkerDiagnostics) -> None:
+ diag = local_queue_diagnostics()
+ worker_diag.ping_ok = bool(diag.get("ping_ok"))
+ worker_diag.workers = sorted({str(item) for item in (diag.get("workers") or []) if item})
+ worker_diag.system_info = diag.get("system_info")
+ worker_diag.error = str(diag.get("error")) if diag.get("error") else None
+
+
+def _iter_worker_pongs(pongs: object) -> Iterable[object]:
+ if isinstance(pongs, (list, tuple)):
+ return pongs
+ return ()
+
+
+def _collect_celery_worker_names(pongs: object) -> List[str]:
+ names: Set[str] = set()
+ for item in _iter_worker_pongs(pongs):
+ if isinstance(item, dict):
+ names.update(str(name) for name in item.keys() if name)
+ return sorted(names)
+
+
+def _populate_worker_diag_celery(worker_diag: WorkerDiagnostics) -> None:
+ try:
+ app = get_celery_app()
+ except _CELERY_BOOTSTRAP_EXCEPTIONS as exc: # pragma: no cover - best effort
+ worker_diag.error = f"Celery unavailable: {exc}"
+ return
+
+ try:
+ pongs = app.control.ping(timeout=1.0)
+ worker_diag.workers = _collect_celery_worker_names(pongs)
+ worker_diag.ping_ok = bool(worker_diag.workers)
+ except _CELERY_RUNTIME_EXCEPTIONS as exc:
+ worker_diag.error = f"Worker ping failed: {exc}"
+ return
+
+ if not worker_diag.ping_ok:
+ return
+
+ try:
+ res = app.send_task("tasks.system_info")
+ worker_diag.system_info = res.get(timeout=3.0)
+ except _CELERY_RUNTIME_EXCEPTIONS as exc:
+ worker_diag.error = _append_diag_error(
+ worker_diag.error,
+ f"Worker diagnostics task failed: {exc}",
+ )
+
+
@router.get("/system/status", response_model=SystemStatusResponse, tags=["System"])
def system_status() -> SystemStatusResponse:
settings = get_settings()
storage = get_storage(media_root=settings.media_root)
worker_diag = WorkerDiagnostics()
- try:
- app = get_celery_app()
- try:
- pongs = app.control.ping(timeout=1.0)
- workers = []
- for item in pongs or []:
- if isinstance(item, dict):
- workers.extend(item.keys())
- worker_diag.workers = sorted(set(workers))
- worker_diag.ping_ok = bool(worker_diag.workers)
- except Exception as exc:
- worker_diag.error = f"Worker ping failed: {exc}"
-
- if worker_diag.ping_ok:
- try:
- res = app.send_task("tasks.system_info")
- worker_diag.system_info = res.get(timeout=3.0)
- except Exception as exc:
- msg = f"Worker diagnostics task failed: {exc}"
- worker_diag.error = f"{worker_diag.error}; {msg}" if worker_diag.error else msg
- except Exception as exc: # pragma: no cover - best effort
- worker_diag.error = f"Celery unavailable: {exc}"
+ if is_local_queue_mode():
+ _populate_worker_diag_local_queue(worker_diag)
+ else:
+ _populate_worker_diag_celery(worker_diag)
return SystemStatusResponse(
api_version=settings.api_version,
@@ -1628,8 +1736,11 @@ def create_workflow_run(payload: WorkflowRunCreateRequest, session: SessionDep,
session.commit()
try:
- result = get_celery_app().send_task("tasks.run_workflow_pipeline", args=[str(run.id)])
- run.task_id = result.id
+ if is_local_queue_mode():
+ run.task_id = dispatch_local_task("tasks.run_workflow_pipeline", str(run.id), queue=_celery_queue_name("CPU"))
+ else:
+ result = get_celery_app().send_task("tasks.run_workflow_pipeline", args=[str(run.id)])
+ run.task_id = result.id
session.add(run)
session.commit()
session.refresh(run)
@@ -1671,7 +1782,10 @@ def cancel_workflow_run(run_id: UUID, session: SessionDep, principal: PrincipalD
session.add(run)
if run.task_id:
try:
- get_celery_app().control.revoke(run.task_id, terminate=False)
+ if is_local_queue_mode():
+ revoke_local_task(run.task_id)
+ else:
+ get_celery_app().control.revoke(run.task_id, terminate=False)
except Exception:
pass
pending_steps = session.exec(
@@ -2857,6 +2971,7 @@ async def upload_asset(
tmp_dir.mkdir(parents=True, exist_ok=True)
tmp_path = tmp_dir / filename
total = 0
+ exceeded = False
with tmp_path.open("wb") as out:
while True:
chunk = await file.read(1024 * 1024)
@@ -2864,15 +2979,19 @@ async def upload_asset(
break
total += len(chunk)
if max_bytes and total > max_bytes:
- tmp_path.unlink(missing_ok=True)
- raise ApiError(
- status_code=status.HTTP_413_CONTENT_TOO_LARGE,
- code=ErrorCode.VALIDATION_ERROR,
- message="Upload too large",
- details={"max_upload_bytes": max_bytes, "uploaded_bytes": total},
- )
+ exceeded = True
+ break
out.write(chunk)
+ if exceeded:
+ tmp_path.unlink(missing_ok=True)
+ raise ApiError(
+ status_code=status.HTTP_413_CONTENT_TOO_LARGE,
+ code=ErrorCode.VALIDATION_ERROR,
+ message="Upload too large",
+ details={"max_upload_bytes": max_bytes, "uploaded_bytes": total},
+ )
+
rel_dir = _scoped_tmp_rel_dir(storage, principal)
uri = storage.write_file(rel_dir=rel_dir, filename=filename, source_path=tmp_path, content_type=file.content_type)
if not isinstance(storage, LocalStorageBackend):
@@ -3015,3 +3134,4 @@ def download_asset(asset_id: UUID, session: SessionDep, principal: PrincipalDep)
def list_style_presets(session: SessionDep, principal: PrincipalDep) -> List[SubtitleStylePreset]:
presets = session.exec(select(SubtitleStylePreset)).all()
return presets
+
diff --git a/apps/api/app/config.py b/apps/api/app/config.py
index 032ba505..fbf9e4ff 100644
--- a/apps/api/app/config.py
+++ b/apps/api/app/config.py
@@ -109,6 +109,11 @@ class Settings(BaseSettings):
validation_alias=AliasChoices("APP_BASE_URL", "REFRAME_APP_BASE_URL"),
description="Public frontend URL used for OAuth/billing redirects.",
)
+ desktop_web_dist: str = Field(
+ default="",
+ validation_alias=AliasChoices("DESKTOP_WEB_DIST", "REFRAME_DESKTOP_WEB_DIST"),
+ description="Optional absolute path to built desktop web assets mounted at '/'.",
+ )
api_base_url: str = Field(
default="http://localhost:8000",
validation_alias=AliasChoices("API_BASE_URL", "REFRAME_API_BASE_URL"),
diff --git a/apps/api/app/local_queue.py b/apps/api/app/local_queue.py
new file mode 100644
index 00000000..0b8c76ac
--- /dev/null
+++ b/apps/api/app/local_queue.py
@@ -0,0 +1,109 @@
+from __future__ import absolute_import
+
+import os
+from concurrent.futures import Future, ThreadPoolExecutor
+from functools import lru_cache
+from threading import Lock
+from typing import Any, Dict, Optional, Tuple
+from uuid import uuid4
+
+
+def _truthy(value: Optional[str]) -> bool:
+ return (value or "").strip().lower() in {"1", "true", "yes", "on"}
+
+
+def is_local_queue_mode() -> bool:
+ return _truthy(os.getenv("REFRAME_LOCAL_QUEUE_MODE") or os.getenv("LOCAL_QUEUE_MODE"))
+
+
+@lru_cache(maxsize=1)
+def _executor() -> ThreadPoolExecutor:
+ workers_raw = (os.getenv("REFRAME_LOCAL_QUEUE_WORKERS") or "4").strip()
+ try:
+ workers = max(1, int(workers_raw))
+ except ValueError:
+ workers = 4
+ return ThreadPoolExecutor(max_workers=workers, thread_name_prefix="reframe-local-queue")
+
+
+@lru_cache(maxsize=1)
+def _worker_tasks() -> Dict[str, Any]:
+ from services.worker import worker as worker_module
+
+ # Celery task registry gives us the same task names that send_task dispatches.
+ return dict(worker_module.celery_app.tasks)
+
+
+_pending_lock = Lock()
+_pending: Dict[str, Future] = {}
+
+
+def _run_task(task_name: str, args: Tuple[Any, ...]) -> Any:
+ tasks = _worker_tasks()
+ task = tasks.get(task_name)
+ if task is None:
+ raise RuntimeError(f"Local queue task not found: {task_name}")
+ return task.run(*args)
+
+
+def dispatch_task(task_name: str, *args: Any, queue: Optional[str] = None) -> str:
+ if not is_local_queue_mode():
+ raise RuntimeError("Local queue mode is not enabled")
+
+ task_id = f"local-{uuid4()}"
+
+ def _wrapped() -> None:
+ _run_task(task_name, args)
+
+ future = _executor().submit(_wrapped)
+ with _pending_lock:
+ _pending[task_id] = future
+
+ def _cleanup(_fut: Future) -> None:
+ with _pending_lock:
+ _pending.pop(task_id, None)
+
+ future.add_done_callback(_cleanup)
+ return task_id
+
+
+def revoke_task(task_id: str) -> bool:
+ with _pending_lock:
+ future = _pending.get(task_id)
+ if future is None:
+ return False
+ return future.cancel()
+
+
+def diagnostics() -> Dict[str, Any]:
+ if not is_local_queue_mode():
+ return {
+ "ping_ok": False,
+ "workers": [],
+ "system_info": None,
+ "error": "Local queue mode is disabled",
+ }
+
+ info: Optional[Dict[str, Any]] = None
+ error: Optional[str] = None
+ try:
+ task = _worker_tasks().get("tasks.system_info")
+ if task is None:
+ raise RuntimeError("tasks.system_info is unavailable")
+ info = task.run()
+ except (RuntimeError, ValueError, TypeError, AttributeError, OSError) as exc: # pragma: no cover - defensive
+ error = f"Local diagnostics failed: {exc}"
+
+ with _pending_lock:
+ queued = len(_pending)
+
+ workers = ["local-queue"]
+ if queued > 0:
+ workers.append(f"pending:{queued}")
+
+ return {
+ "ping_ok": True,
+ "workers": workers,
+ "system_info": info,
+ "error": error,
+ }
diff --git a/apps/api/app/main.py b/apps/api/app/main.py
index 1e4093fb..f1d03d28 100644
--- a/apps/api/app/main.py
+++ b/apps/api/app/main.py
@@ -1,25 +1,83 @@
+from __future__ import division
+
import logging
+import os
+import stat
import time
from contextlib import asynccontextmanager
from pathlib import Path
from uuid import uuid4
from fastapi import FastAPI
+from fastapi import HTTPException
from fastapi import Request
-from fastapi.staticfiles import StaticFiles
+from fastapi.responses import FileResponse
from fastapi.responses import JSONResponse
+from fastapi.staticfiles import StaticFiles
-from app.config import get_settings
-from app.database import create_db_and_tables
from app.api import router as api_router
from app.auth_api import router as auth_router
from app.billing_api import router as billing_router
+from app.cleanup import start_cleanup_loop
from app.collaboration_api import router as collaboration_router
-from app.identity_api import router as identity_router
-from app.publish_api import router as publish_router
+from app.config import get_settings
+from app.database import create_db_and_tables
from app.errors import ApiError, ErrorResponse
-from app.cleanup import start_cleanup_loop
+from app.identity_api import router as identity_router
from app.logging_config import setup_logging
+from app.publish_api import router as publish_router
+
+
+_RESERVED_DESKTOP_PREFIXES = (
+ "api",
+ "docs",
+ "openapi.json",
+ "redoc",
+ "media",
+ "health",
+ "healthz",
+)
+def _is_reserved_desktop_path(normalized: str) -> bool:
+ return any(
+ normalized == reserved or normalized.startswith(f"{reserved}/")
+ for reserved in _RESERVED_DESKTOP_PREFIXES
+ )
+
+
+def _has_path_traversal(normalized: str) -> bool:
+ segments = [part for part in normalized.replace("\\", "/").split("/") if part]
+ return any(part == ".." for part in segments)
+
+
+def _mount_desktop_web(api_app: FastAPI, desktop_web_dist: str) -> None:
+ raw = (desktop_web_dist or "").strip()
+ if not raw:
+ return
+
+ web_dist = Path(raw).resolve()
+ index_path = web_dist / "index.html"
+ if not os.path.isfile(index_path):
+ return
+ static_files = StaticFiles(directory=str(web_dist), check_dir=False)
+
+ @api_app.get("/", include_in_schema=False)
+ def desktop_index() -> FileResponse:
+ return FileResponse(index_path)
+
+ _ = desktop_index
+
+ @api_app.get("/{full_path:path}", include_in_schema=False, responses={404: {"description": "Not Found"}})
+ def desktop_spa(full_path: str) -> FileResponse:
+ normalized = (full_path or "").lstrip("/")
+ if _has_path_traversal(normalized) or _is_reserved_desktop_path(normalized):
+ raise HTTPException(status_code=404)
+
+ candidate, stat_result = static_files.lookup_path(normalized)
+ if stat_result is not None and stat.S_ISREG(stat_result.st_mode):
+ return FileResponse(candidate)
+ return FileResponse(index_path)
+
+ _ = desktop_spa
def create_app() -> FastAPI:
@@ -110,6 +168,8 @@ async def api_error_handler(_, exc: ApiError):
def health() -> dict[str, str]:
return {"status": "ok", "version": settings.api_version}
+ _mount_desktop_web(app, settings.desktop_web_dist)
+
return app
diff --git a/apps/api/app/publish_api.py b/apps/api/app/publish_api.py
index 404a7c9a..40477f5b 100644
--- a/apps/api/app/publish_api.py
+++ b/apps/api/app/publish_api.py
@@ -21,6 +21,7 @@ def send_task(self, *_args, **_kwargs):
from app.auth_api import PrincipalDep
from app.config import get_settings
+from app.local_queue import dispatch_task as dispatch_local_task, is_local_queue_mode
from app.database import get_session
from app.errors import ApiError, ErrorCode, ErrorResponse, conflict, not_found, unauthorized
from app.models import AutomationRunEvent, MediaAsset, PublishConnection, PublishJob
@@ -179,6 +180,8 @@ def _celery_app() -> Celery:
def _dispatch_publish_task(job: PublishJob) -> str:
+ if is_local_queue_mode():
+ return dispatch_local_task("tasks.publish_asset", str(job.id))
result = _celery_app().send_task("tasks.publish_asset", args=[str(job.id)])
return result.id
diff --git a/apps/api/tests/conftest.py b/apps/api/tests/conftest.py
index 584ffef3..678d92ca 100644
--- a/apps/api/tests/conftest.py
+++ b/apps/api/tests/conftest.py
@@ -39,7 +39,7 @@ def test_client(monkeypatch: pytest.MonkeyPatch, tmp_path: Path):
media_root.mkdir(parents=True, exist_ok=True)
db_path = tmp_path / "reframe-test.db"
- db_url = f"sqlite:////{str(db_path).lstrip('/')}"
+ db_url = f"sqlite:///{db_path.as_posix()}"
monkeypatch.setenv("DATABASE_URL", db_url)
monkeypatch.setenv("REFRAME_MEDIA_ROOT", str(media_root))
diff --git a/apps/api/tests/test_api_internal_helpers.py b/apps/api/tests/test_api_internal_helpers.py
new file mode 100644
index 00000000..55825d4c
--- /dev/null
+++ b/apps/api/tests/test_api_internal_helpers.py
@@ -0,0 +1,148 @@
+from __future__ import annotations
+
+import asyncio
+from datetime import datetime, timezone
+from pathlib import Path
+from types import SimpleNamespace
+from uuid import uuid4
+
+import pytest
+from sqlmodel import select
+
+from app import api
+from app.errors import ApiError
+from app.models import Job, OrgBudgetPolicy
+
+
+def test_queue_and_gpu_helpers(monkeypatch):
+ monkeypatch.setenv("REFRAME_ENABLE_GPU_QUEUE", "true")
+ monkeypatch.setenv("REFRAME_ASSUME_GPU_FOR_TRANSCRIBE_BACKENDS", "true")
+ monkeypatch.setenv("REFRAME_CELERY_QUEUE_GPU", "gpuq")
+ monkeypatch.setenv("REFRAME_CELERY_QUEUE_CPU", "cpuq")
+ monkeypatch.setenv("REFRAME_CELERY_QUEUE_DEFAULT", "defq")
+
+ assert api._env_truthy("ENABLE_GPU_QUEUE") is True
+ assert api._celery_queue_name("GPU") == "gpuq"
+ assert api._celery_queue_name("CPU") == "cpuq"
+ assert api._celery_queue_name("DEFAULT") == "defq"
+
+ assert api._task_prefers_gpu("tasks.generate_captions", {"backend": "faster_whisper"}) is True
+ assert api._task_prefers_gpu("tasks.transcribe_video", {"device": "cuda"}) is True
+ assert api._task_prefers_gpu("tasks.merge_video_audio", {}) is False
+
+ assert api._resolve_task_queue("tasks.generate_captions", {"backend": "faster_whisper"}) == "gpuq"
+ assert api._resolve_task_queue("tasks.generate_shorts", {}) == "cpuq"
+ assert api._resolve_task_queue("tasks.unknown", {}) == "defq"
+
+
+def test_scope_and_org_access_helpers():
+ org_id = uuid4()
+ principal = SimpleNamespace(org_id=org_id)
+ query = select(Job)
+ scoped = api._scope_query_by_org(query, Job, principal)
+ assert "org_id" in str(scoped)
+
+ api._assert_org_access(principal=principal, entity_org_id=org_id, entity="job", entity_id="1")
+
+ with pytest.raises(ApiError):
+ api._assert_org_access(principal=principal, entity_org_id=uuid4(), entity="job", entity_id="2")
+
+
+def test_idempotency_and_redirect_helpers(monkeypatch):
+ assert api._resolve_idempotency_key(" abc ", None) == "abc"
+ assert api._resolve_idempotency_key(None, "hdr") == "hdr"
+ assert api._resolve_idempotency_key("", "") is None
+
+ with pytest.raises(ApiError):
+ api._resolve_idempotency_key("x" * 129, None)
+
+ assert api._is_forbidden_ip_host("127.0.0.1") is True
+ assert api._is_forbidden_ip_host("8.8.8.8") is False
+
+ assert api._safe_redirect_url("https://example.com/file.txt#frag") == "https://example.com/file.txt"
+
+ with pytest.raises(ApiError):
+ api._safe_redirect_url("http://example.com/file.txt")
+ with pytest.raises(ApiError):
+ api._safe_redirect_url("https://user:pass@example.com/file.txt")
+ with pytest.raises(ApiError):
+ api._safe_redirect_url("https://localhost/file.txt")
+ with pytest.raises(ApiError):
+ api._safe_redirect_url("https://127.0.0.1/file.txt")
+
+
+def test_local_asset_stream_and_path_helpers(tmp_path: Path):
+ media_root = tmp_path / "media"
+ media_root.mkdir(parents=True, exist_ok=True)
+ file_path = media_root / "a.bin"
+ file_path.write_bytes(b"hello")
+
+ resolved = api._safe_local_asset_path(media_root=str(media_root), uri="a.bin")
+ assert resolved == file_path
+
+ with pytest.raises(ApiError):
+ api._safe_local_asset_path(media_root=str(media_root), uri="../escape.bin")
+
+ response = api._stream_local_file(file_path=file_path, mime_type="application/octet-stream")
+ assert response.headers["Content-Disposition"].startswith("attachment;")
+
+ async def _collect() -> bytes:
+ data = bytearray()
+ async for chunk in response.body_iterator:
+ data.extend(chunk)
+ return bytes(data)
+
+ assert asyncio.run(_collect()) == b"hello"
+
+
+def test_cost_budget_and_datetime_helpers(monkeypatch):
+ assert api._coerce_non_negative_float("12.5") == 12.5
+ assert api._coerce_non_negative_float("-1") == 0.0
+ assert api._coerce_non_negative_float("2", scale=0.5) == 1.0
+
+ assert api._extract_estimated_minutes({"expected_minutes": 7}) == 7.0
+ assert api._extract_estimated_minutes({"duration_seconds": 180}) == 3.0
+ assert api._extract_estimated_minutes({}) == 0.0
+
+ assert api._estimate_job_submission_cost_cents(job_type="captions", payload={"duration_seconds": 120}) == 29
+ assert api._estimate_job_submission_cost_cents(job_type="unknown", payload={"estimated_cost_cents": 17}) == 17
+
+ assert api._optional_int(None) is None
+ assert api._optional_int("8") == 8
+
+ assert api._budget_projected_status(current_month_estimated_cost_cents=15, soft_limit=20, hard_limit=30) == "ok"
+ assert api._budget_projected_status(current_month_estimated_cost_cents=25, soft_limit=20, hard_limit=30) == "soft_limit_exceeded"
+ assert api._budget_projected_status(current_month_estimated_cost_cents=35, soft_limit=20, hard_limit=30) == "hard_limit_exceeded"
+
+ dt_naive = datetime(2026, 3, 1, 1, 2, 3)
+ aware = api._coerce_aware_datetime(dt_naive)
+ assert aware is not None and aware.tzinfo is not None
+
+ dt_aware = datetime(2026, 3, 1, 1, 2, 3, tzinfo=timezone.utc)
+ assert api._coerce_aware_datetime(dt_aware) == dt_aware
+
+ assert api._coerce_aware_datetime(None) is None
+
+ principal_admin = SimpleNamespace(role="admin")
+ principal_owner = SimpleNamespace(role="owner")
+ principal_member = SimpleNamespace(role="member")
+ api._require_org_manager_role(principal_admin)
+ api._require_org_manager_role(principal_owner)
+ with pytest.raises(ApiError):
+ api._require_org_manager_role(principal_member)
+
+ policy = OrgBudgetPolicy(
+ org_id=uuid4(),
+ monthly_soft_limit_cents=100,
+ monthly_hard_limit_cents=150,
+ enforce_hard_limit=True,
+ )
+ view = api._serialize_budget_policy(
+ policy=policy,
+ org_id=policy.org_id,
+ current_month_estimated_cost_cents=120,
+ )
+ assert view.projected_status == "soft_limit_exceeded"
+
+ assert api._month_start_utc().day == 1
+
diff --git a/apps/api/tests/test_coverage_wave_api_helpers.py b/apps/api/tests/test_coverage_wave_api_helpers.py
new file mode 100644
index 00000000..425a6d08
--- /dev/null
+++ b/apps/api/tests/test_coverage_wave_api_helpers.py
@@ -0,0 +1,406 @@
+from __future__ import annotations
+
+import builtins
+import logging
+import sys
+import os
+import threading
+import time
+from types import SimpleNamespace
+
+import pytest
+
+from app import billing, cleanup, local_queue, logging_config, storage
+
+
+def _expect(condition: bool, message: str) -> None:
+ if not condition:
+ raise AssertionError(message)
+
+
+@pytest.fixture(autouse=True)
+def _reset_local_queue_state(monkeypatch):
+ local_queue._executor.cache_clear()
+ if hasattr(local_queue._worker_tasks, "cache_clear"):
+ local_queue._worker_tasks.cache_clear()
+ with local_queue._pending_lock:
+ local_queue._pending.clear()
+ monkeypatch.delenv("REFRAME_LOCAL_QUEUE_MODE", raising=False)
+ monkeypatch.delenv("LOCAL_QUEUE_MODE", raising=False)
+ monkeypatch.delenv("REFRAME_LOCAL_QUEUE_WORKERS", raising=False)
+ yield
+ local_queue._executor.cache_clear()
+ if hasattr(local_queue._worker_tasks, "cache_clear"):
+ local_queue._worker_tasks.cache_clear()
+ with local_queue._pending_lock:
+ local_queue._pending.clear()
+
+
+def test_local_queue_truthy_and_mode_detection(monkeypatch):
+ _expect(local_queue._truthy("1"), "Expected truthy helper to treat 1 as true")
+ _expect(not local_queue._truthy("0"), "Expected truthy helper to treat 0 as false")
+ _expect(not local_queue.is_local_queue_mode(), "Expected local queue mode disabled by default")
+ monkeypatch.setenv("REFRAME_LOCAL_QUEUE_MODE", "true")
+ _expect(local_queue.is_local_queue_mode(), "Expected local queue mode via REFRAME_LOCAL_QUEUE_MODE")
+ monkeypatch.delenv("REFRAME_LOCAL_QUEUE_MODE", raising=False)
+ monkeypatch.setenv("LOCAL_QUEUE_MODE", "yes")
+ _expect(local_queue.is_local_queue_mode(), "Expected local queue mode via LOCAL_QUEUE_MODE")
+
+
+def test_local_queue_dispatch_and_revoke(monkeypatch):
+ monkeypatch.setenv("REFRAME_LOCAL_QUEUE_MODE", "true")
+ calls: list[tuple[str, tuple[object, ...]]] = []
+ ready = threading.Event()
+
+ def fake_run_task(task_name: str, args: tuple[object, ...]) -> None:
+ calls.append((task_name, args))
+ ready.set()
+
+ monkeypatch.setattr(local_queue, "_run_task", fake_run_task)
+
+ task_id = local_queue.dispatch_task("tasks.echo", "hello", queue="high")
+ _expect(task_id.startswith("local-"), "Expected local queue task id prefix")
+ _expect(ready.wait(timeout=2), "Expected dispatched task to execute")
+
+ for _ in range(20):
+ with local_queue._pending_lock:
+ if task_id not in local_queue._pending:
+ break
+ time.sleep(0.02)
+
+ _expect(calls == [("tasks.echo", ("hello",))], "Expected _run_task dispatch call")
+ _expect(not local_queue.revoke_task("missing"), "Expected revoke false for missing task")
+
+
+def test_local_queue_dispatch_requires_enabled():
+ with pytest.raises(RuntimeError):
+ local_queue.dispatch_task("tasks.echo")
+
+
+def test_local_queue_diagnostics_enabled_and_error_paths(monkeypatch):
+ monkeypatch.setenv("REFRAME_LOCAL_QUEUE_MODE", "true")
+
+ monkeypatch.setattr(
+ local_queue,
+ "_worker_tasks",
+ lambda: {"tasks.system_info": SimpleNamespace(run=lambda: {"ffmpeg": {"present": True}})},
+ )
+ payload = local_queue.diagnostics()
+ _expect(payload["ping_ok"] is True, "Expected diagnostics ping ok")
+ _expect(payload["system_info"] == {"ffmpeg": {"present": True}}, "Expected system_info payload")
+ _expect(payload["error"] is None, "Expected no diagnostics error")
+
+ monkeypatch.setattr(local_queue, "_worker_tasks", lambda: {})
+ payload_no_task = local_queue.diagnostics()
+ _expect(payload_no_task["ping_ok"] is True, "Expected diagnostics ping true in local mode")
+ _expect(payload_no_task["system_info"] is None, "Expected missing system_info")
+ _expect("unavailable" in str(payload_no_task["error"]), "Expected unavailable error message")
+
+ monkeypatch.delenv("REFRAME_LOCAL_QUEUE_MODE", raising=False)
+ disabled = local_queue.diagnostics()
+ _expect(disabled["ping_ok"] is False, "Expected disabled diagnostics ping false")
+ _expect("disabled" in str(disabled["error"]).lower(), "Expected disabled diagnostics error")
+
+
+def test_local_storage_backend_file_lifecycle(tmp_path):
+ backend = storage.LocalStorageBackend(media_root=tmp_path, public_prefix="/media")
+ uri = backend.write_bytes(rel_dir="org-a/tmp", filename="hello.txt", data=b"hello")
+ _expect(uri == "/media/org-a/tmp/hello.txt", "Expected media URI for written bytes")
+ local_path = backend.resolve_local_path(uri)
+ _expect(local_path.read_bytes() == b"hello", "Expected file contents after write_bytes")
+
+ source = tmp_path / "source.bin"
+ source.write_bytes(b"abc")
+ uri_file = backend.write_file(rel_dir="org-a/out", filename="copy.bin", source_path=source)
+ _expect(uri_file == "/media/org-a/out/copy.bin", "Expected media URI for write_file")
+ _expect(backend.get_download_url(uri_file) == uri_file, "Expected direct URI for local download")
+ _expect(backend.resolve_local_path("/media/org-a/out/copy.bin").read_bytes() == b"abc", "Expected copied bytes")
+
+ backend.delete_uri(uri_file)
+ _expect(not backend.resolve_local_path(uri_file).exists(), "Expected deleted URI to remove file")
+
+ with pytest.raises(ValueError):
+ backend.resolve_local_path("/media/../../escape.txt")
+ with pytest.raises(ValueError):
+ backend.resolve_local_path("https://example.test/file.bin")
+ with pytest.raises(ValueError):
+ backend.create_presigned_upload(rel_dir="a", filename="b", content_type=None, expires_seconds=60)
+ with pytest.raises(ValueError):
+ backend.create_multipart_upload(rel_dir="a", filename="b", content_type=None)
+ with pytest.raises(ValueError):
+ backend.sign_multipart_part(key="k", provider_upload_id="u", part_number=1, expires_seconds=60)
+ with pytest.raises(ValueError):
+ backend.complete_multipart_upload(key="k", provider_upload_id="u", parts=[])
+ with pytest.raises(ValueError):
+ backend.abort_multipart_upload(key="k", provider_upload_id="u")
+
+
+def test_storage_helpers_and_get_storage_modes(monkeypatch, tmp_path):
+ _expect(storage.is_remote_uri("https://example.test/a"), "Expected https URI to be treated as remote")
+ _expect(storage.is_remote_uri("s3://bucket/key"), "Expected s3 URI to be treated as remote")
+ _expect(not storage.is_remote_uri("/media/a"), "Expected local path to be non-remote")
+ _expect(storage._join_key("/a/", "b", "c/") == "a/b/c", "Expected normalized key join")
+
+ monkeypatch.setenv("S3_BUCKET", "")
+ monkeypatch.setenv("REFRAME_STORAGE_BACKEND", "local")
+ local_backend = storage.get_storage(media_root=tmp_path)
+ _expect(isinstance(local_backend, storage.LocalStorageBackend), "Expected local storage backend")
+
+ monkeypatch.setenv("REFRAME_STORAGE_BACKEND", "unknown-backend")
+ with pytest.raises(ValueError):
+ storage.get_storage(media_root=tmp_path)
+
+ monkeypatch.setenv("REFRAME_STORAGE_BACKEND", "s3")
+ monkeypatch.setenv("REFRAME_OFFLINE_MODE", "true")
+ with pytest.raises(RuntimeError):
+ storage.get_storage(media_root=tmp_path)
+
+
+def test_s3_storage_backend_core_paths(monkeypatch, tmp_path):
+ class FakeClient:
+ def __init__(self):
+ self.calls: list[tuple[str, tuple, dict]] = []
+
+ def put_object(self, **kwargs):
+ self.calls.append(("put_object", (), kwargs))
+
+ def upload_file(self, *args, **kwargs):
+ self.calls.append(("upload_file", args, kwargs))
+
+ def generate_presigned_url(self, op, Params=None, ExpiresIn=None):
+ self.calls.append(("generate_presigned_url", (op,), {"Params": Params, "ExpiresIn": ExpiresIn}))
+ return f"https://upload.example/{op}"
+
+ def create_multipart_upload(self, **kwargs):
+ self.calls.append(("create_multipart_upload", (), kwargs))
+ return {"UploadId": "upload-1"}
+
+ def complete_multipart_upload(self, **kwargs):
+ self.calls.append(("complete_multipart_upload", (), kwargs))
+
+ def abort_multipart_upload(self, **kwargs):
+ self.calls.append(("abort_multipart_upload", (), kwargs))
+
+ def delete_object(self, **kwargs):
+ self.calls.append(("delete_object", (), kwargs))
+
+ fake_client = FakeClient()
+
+ class FakeSession:
+ def client(self, *_args, **_kwargs):
+ return fake_client
+
+ class FakeSessionFactory:
+ def Session(self, **_kwargs):
+ return FakeSession()
+
+ monkeypatch.setattr(storage, "_ensure_boto3", lambda: SimpleNamespace(session=FakeSessionFactory()))
+
+ backend = storage.S3StorageBackend(
+ bucket="bucket-a",
+ prefix="tenant",
+ endpoint_url="https://s3.example.test",
+ public_base_url="https://cdn.example.test/assets",
+ public_downloads=True,
+ presign_expires_seconds=300,
+ )
+
+ src = tmp_path / "in.bin"
+ src.write_bytes(b"data")
+
+ uri = backend.write_bytes(rel_dir="org", filename="bytes.bin", data=b"1", content_type="application/octet-stream")
+ _expect(uri == "s3://bucket-a/tenant/org/bytes.bin", "Expected S3 URI for write_bytes")
+ uri_file = backend.write_file(rel_dir="org", filename="file.bin", source_path=src, content_type="application/octet-stream")
+ _expect(uri_file == "s3://bucket-a/tenant/org/file.bin", "Expected S3 URI for write_file")
+ _expect(backend.get_download_url(uri_file) == "https://cdn.example.test/assets/tenant/org/file.bin", "Expected public download URL path")
+ _expect(backend.get_download_url("s3://other-bucket/file") is None, "Expected None for foreign-bucket URI")
+
+ presigned = backend.create_presigned_upload(
+ rel_dir="org",
+ filename="upload.bin",
+ content_type="application/octet-stream",
+ expires_seconds=120,
+ )
+ _expect(presigned["method"] == "PUT", "Expected PUT method for presigned upload")
+
+ multi = backend.create_multipart_upload(rel_dir="org", filename="multi.bin", content_type=None)
+ _expect(multi["upload_id"] == "upload-1", "Expected multipart upload id")
+
+ part = backend.sign_multipart_part(
+ key=multi["key"],
+ provider_upload_id=multi["upload_id"],
+ part_number=1,
+ expires_seconds=60,
+ )
+ _expect(part["method"] == "PUT", "Expected multipart part PUT upload")
+
+ backend.complete_multipart_upload(
+ key=multi["key"],
+ provider_upload_id=multi["upload_id"],
+ parts=[{"part_number": 2, "etag": "b"}, {"part_number": 1, "etag": "a"}],
+ )
+ with pytest.raises(ValueError):
+ backend.complete_multipart_upload(key=multi["key"], provider_upload_id=multi["upload_id"], parts=[{"part_number": 0}])
+
+ backend.abort_multipart_upload(key=multi["key"], provider_upload_id=multi["upload_id"])
+ backend.delete_uri(uri)
+
+ with pytest.raises(ValueError):
+ backend.resolve_local_path(uri)
+
+ ops = [name for name, _args, _kwargs in fake_client.calls]
+ _expect("put_object" in ops, "Expected put_object call")
+ _expect("upload_file" in ops, "Expected upload_file call")
+ _expect("create_multipart_upload" in ops, "Expected create_multipart_upload call")
+ _expect("complete_multipart_upload" in ops, "Expected complete_multipart_upload call")
+ _expect("abort_multipart_upload" in ops, "Expected abort_multipart_upload call")
+
+
+def test_json_formatter_and_setup_logging_paths():
+ formatter = logging_config.JsonFormatter()
+
+ try:
+ raise RuntimeError("boom")
+ except RuntimeError:
+ record = logging.LogRecord(
+ name="reframe.test",
+ level=logging.ERROR,
+ pathname=__file__,
+ lineno=1,
+ msg="failure: %s",
+ args=("x",),
+ exc_info=sys.exc_info(),
+ )
+ record.user_id = "u-1"
+ rendered = formatter.format(record)
+ _expect('"message": "failure: x"' in rendered, "Expected rendered log message")
+ _expect('"user_id": "u-1"' in rendered, "Expected extra log field")
+ _expect("exc_info" in rendered, "Expected formatted exception info")
+
+ logger = logging.getLogger("reframe")
+ setattr(logger, "_reframe_configured", False)
+ for handler in list(logger.handlers):
+ logger.removeHandler(handler)
+
+ logging_config.setup_logging(log_format="plain", log_level="warning")
+ first_count = len(logger.handlers)
+ logging_config.setup_logging(log_format="json", log_level="debug")
+ _expect(len(logger.handlers) == first_count, "Expected setup logging to be idempotent")
+
+
+def test_cleanup_old_files_and_loop_start(tmp_path):
+ target = tmp_path / "tmp"
+ target.mkdir(parents=True, exist_ok=True)
+
+ old_file = target / "old.txt"
+ new_file = target / "new.txt"
+ old_file.write_text("old", encoding="utf-8")
+ new_file.write_text("new", encoding="utf-8")
+
+ old_ts = time.time() - 10_000
+ os.utime(old_file, (old_ts, old_ts))
+
+ cleanup._remove_old_files(target, older_than=cleanup.timedelta(seconds=1))
+ _expect(not old_file.exists(), "Expected old file cleanup")
+ _expect(new_file.exists(), "Expected newer file to remain")
+
+ thread = cleanup.start_cleanup_loop(str(tmp_path), interval_seconds=60, ttl_hours=24)
+ _expect(thread is not None, "Expected cleanup thread")
+ _expect(thread.daemon, "Expected cleanup loop thread daemonized")
+ _expect((tmp_path / "tmp").exists(), "Expected tmp directory creation")
+
+
+def test_billing_plan_and_stripe_paths(monkeypatch):
+ free = billing.get_plan_policy("unknown-plan")
+ _expect(free.code == "free", "Expected free fallback policy")
+ _expect(billing.get_plan_policy("enterprise").seat_limit == 200, "Expected enterprise policy lookup")
+
+ class _Settings:
+ enable_billing = False
+ stripe_secret_key = ""
+
+ monkeypatch.setattr(billing, "get_settings", lambda: _Settings())
+ with pytest.raises(RuntimeError):
+ billing.build_checkout_session(
+ customer_id=None,
+ price_id="price_x",
+ success_url="https://ok",
+ cancel_url="https://cancel",
+ )
+
+ class _SettingsEnabledNoKey:
+ enable_billing = True
+ stripe_secret_key = ""
+
+ monkeypatch.setattr(billing, "get_settings", lambda: _SettingsEnabledNoKey())
+ with pytest.raises(RuntimeError):
+ billing.build_customer_portal_session(customer_id="cus_1", return_url="https://ret")
+
+ class _SettingsEnabled:
+ enable_billing = True
+ stripe_secret_key = "sk_test_123"
+
+ checkout_calls: list[dict] = []
+ modify_calls: list[tuple[str, dict]] = []
+ portal_calls: list[dict] = []
+
+ class _CheckoutSession:
+ @staticmethod
+ def create(**kwargs):
+ checkout_calls.append(kwargs)
+ return {"id": "cs_1", "url": "https://checkout"}
+
+ class _Subscription:
+ @staticmethod
+ def modify(sub_id: str, **kwargs):
+ modify_calls.append((sub_id, kwargs))
+
+ class _PortalSession:
+ @staticmethod
+ def create(**kwargs):
+ portal_calls.append(kwargs)
+ return {"id": "bps_1", "url": "https://portal"}
+
+ fake_stripe = SimpleNamespace(
+ api_key=None,
+ checkout=SimpleNamespace(Session=_CheckoutSession),
+ Subscription=_Subscription,
+ billing_portal=SimpleNamespace(Session=_PortalSession),
+ )
+
+ monkeypatch.setattr(billing, "get_settings", lambda: _SettingsEnabled())
+ monkeypatch.setattr(billing, "_get_stripe", lambda: fake_stripe)
+
+ checkout = billing.build_checkout_session(
+ customer_id="cus_1",
+ price_id="price_1",
+ quantity=0,
+ success_url="https://ok",
+ cancel_url="https://cancel",
+ metadata={"org_id": "x"},
+ )
+ _expect(checkout["id"] == "cs_1", "Expected checkout id")
+ _expect(checkout_calls[0]["line_items"][0]["quantity"] == 1, "Expected quantity coercion to minimum 1")
+
+ billing.update_subscription_seat_limit(subscription_id="sub_1", quantity=0)
+ _expect(modify_calls[0][0] == "sub_1", "Expected subscription id for seat update")
+ _expect(modify_calls[0][1]["items"][0]["quantity"] == 1, "Expected seat quantity minimum to 1")
+
+ portal = billing.build_customer_portal_session(customer_id="cus_1", return_url="https://return")
+ _expect(portal["url"] == "https://portal", "Expected portal URL")
+ _expect(portal_calls[0]["customer"] == "cus_1", "Expected portal customer id")
+
+
+def test_get_stripe_import_error(monkeypatch):
+ real_import = builtins.__import__
+
+ def fake_import(name, *args, **kwargs):
+ if name == "stripe":
+ raise ImportError("missing stripe")
+ return real_import(name, *args, **kwargs)
+
+ monkeypatch.setattr(builtins, "__import__", fake_import)
+ with pytest.raises(RuntimeError):
+ billing._get_stripe()
+
+
diff --git a/apps/api/tests/test_desktop_embedded_web.py b/apps/api/tests/test_desktop_embedded_web.py
new file mode 100644
index 00000000..2d82819b
--- /dev/null
+++ b/apps/api/tests/test_desktop_embedded_web.py
@@ -0,0 +1,70 @@
+from __future__ import annotations
+
+from pathlib import Path
+
+from fastapi.testclient import TestClient
+
+
+def _reset_settings_caches() -> None:
+ from app.api import get_celery_app
+ from app.config import get_settings
+ from app.database import get_engine
+
+ get_settings.cache_clear()
+ get_engine.cache_clear()
+ get_celery_app.cache_clear()
+
+
+def test_desktop_embedded_web_mount_serves_index_and_assets(monkeypatch, tmp_path: Path):
+ web_dist = tmp_path / "web-dist"
+ assets = web_dist / "assets"
+ assets.mkdir(parents=True, exist_ok=True)
+ (web_dist / "index.html").write_text("
desktop studio", encoding="utf-8")
+ (assets / "app.js").write_text("console.log('ok');", encoding="utf-8")
+
+ media_root = tmp_path / "media"
+ media_root.mkdir(parents=True, exist_ok=True)
+ monkeypatch.setenv("DATABASE_URL", f"sqlite:///{(tmp_path / 'api.db').as_posix()}")
+ monkeypatch.setenv("REFRAME_MEDIA_ROOT", str(media_root))
+ monkeypatch.setenv("REFRAME_DESKTOP_WEB_DIST", str(web_dist))
+
+ _reset_settings_caches()
+
+ from app.main import create_app
+
+ app = create_app()
+ with TestClient(app) as client:
+ root = client.get("/")
+ assert root.status_code == 200
+ assert "desktop studio" in root.text
+
+ js = client.get("/assets/app.js")
+ assert js.status_code == 200
+ assert "console.log" in js.text
+
+ spa = client.get("/projects/123")
+ assert spa.status_code == 200
+ assert "desktop studio" in spa.text
+
+ traversal = client.get("/%2e%2e/%2e%2e/secret.txt")
+ assert traversal.status_code == 404
+
+ reserved = client.get("/api/_desktop_shell_test")
+ assert reserved.status_code == 404
+
+
+def test_desktop_embedded_web_mount_skips_when_dist_missing(monkeypatch, tmp_path: Path):
+ media_root = tmp_path / "media"
+ media_root.mkdir(parents=True, exist_ok=True)
+ monkeypatch.setenv("DATABASE_URL", f"sqlite:///{(tmp_path / 'api.db').as_posix()}")
+ monkeypatch.setenv("REFRAME_MEDIA_ROOT", str(media_root))
+ monkeypatch.setenv("REFRAME_DESKTOP_WEB_DIST", str(tmp_path / "does-not-exist"))
+
+ _reset_settings_caches()
+
+ from app.main import create_app
+
+ app = create_app()
+ with TestClient(app) as client:
+ assert client.get("/").status_code == 404
+ assert client.get("/health").status_code == 200
diff --git a/apps/api/tests/test_local_queue_mode.py b/apps/api/tests/test_local_queue_mode.py
new file mode 100644
index 00000000..620a2538
--- /dev/null
+++ b/apps/api/tests/test_local_queue_mode.py
@@ -0,0 +1,53 @@
+from __future__ import annotations
+
+from uuid import uuid4
+
+
+def test_enqueue_job_uses_local_queue_when_enabled(monkeypatch):
+ monkeypatch.setenv("REFRAME_LOCAL_QUEUE_MODE", "true")
+
+ import app.api as api_module
+
+ monkeypatch.setattr(api_module, "dispatch_local_task", lambda task_name, *args, queue=None: "local-123")
+
+ class _Job:
+ id = uuid4()
+
+ task_id = api_module.enqueue_job(_Job(), "tasks.generate_captions", "job-id", "asset-id", {"backend": "noop"})
+ assert task_id == "local-123"
+
+
+def test_system_status_prefers_local_queue_diagnostics(monkeypatch):
+ monkeypatch.setenv("REFRAME_LOCAL_QUEUE_MODE", "true")
+
+ import app.api as api_module
+
+ monkeypatch.setattr(
+ api_module,
+ "local_queue_diagnostics",
+ lambda: {
+ "ping_ok": True,
+ "workers": ["local-queue", "pending:2"],
+ "system_info": {"ffmpeg": {"present": True, "version": "6.1"}},
+ "error": None,
+ },
+ )
+
+ status = api_module.system_status()
+ assert status.worker.ping_ok is True
+ assert status.worker.workers == ["local-queue", "pending:2"]
+ assert status.worker.system_info == {"ffmpeg": {"present": True, "version": "6.1"}}
+
+
+def test_publish_dispatch_uses_local_queue_when_enabled(monkeypatch):
+ monkeypatch.setenv("REFRAME_LOCAL_QUEUE_MODE", "true")
+
+ import app.publish_api as publish_api
+
+ monkeypatch.setattr(publish_api, "dispatch_local_task", lambda task_name, *args: "local-publish")
+
+ class _Job:
+ id = uuid4()
+
+ task_id = publish_api._dispatch_publish_task(_Job())
+ assert task_id == "local-publish"
diff --git a/apps/api/tests/test_scripts_coverage_truth_wave.py b/apps/api/tests/test_scripts_coverage_truth_wave.py
new file mode 100644
index 00000000..5bd0b7e5
--- /dev/null
+++ b/apps/api/tests/test_scripts_coverage_truth_wave.py
@@ -0,0 +1,282 @@
+from __future__ import annotations
+
+import json
+import os
+import sys
+from datetime import datetime, timezone
+from importlib.util import module_from_spec, spec_from_file_location
+from pathlib import Path
+
+import pytest
+
+
+def _expect(condition: bool, message: str) -> None:
+ if not condition:
+ raise AssertionError(message)
+
+
+def _repo_root() -> Path:
+ return Path(__file__).resolve().parents[3]
+
+
+def _load_script(path: Path, module_name: str):
+ spec = spec_from_file_location(module_name, path)
+ _expect(spec is not None and spec.loader is not None, f"Unable to load module at {path}")
+ module = module_from_spec(spec)
+ sys.modules[spec.name] = module
+ spec.loader.exec_module(module)
+ return module
+
+
+def test_generate_ops_digest_helpers_cover_edge_cases(tmp_path):
+ module = _load_script(_repo_root() / "scripts" / "generate_ops_digest.py", "generate_ops_digest_cov_wave")
+
+ # Date parsing and windows
+ now = datetime(2026, 3, 4, tzinfo=timezone.utc)
+ start = datetime(2026, 3, 1, tzinfo=timezone.utc)
+ end = datetime(2026, 3, 8, tzinfo=timezone.utc)
+ _expect(module._parse_dt(None) is None, "Expected None datetime for missing value")
+ _expect(module._parse_dt("bad-date") is None, "Expected None datetime for invalid value")
+ _expect(module._in_window(now, start, end), "Expected datetime in window")
+ _expect(not module._in_window(None, start, end), "Expected None datetime outside window")
+
+ # Link header parsing
+ link = '; rel="next", ; rel="last"'
+ _expect(module._next_link(link) == "https://api.example.test/page=2", "Expected next link parsing")
+ _expect(module._next_link(None) is None, "Expected None next link for missing header")
+
+ # Failure-rate and percentile helpers
+ _expect(module._failure_rate(0, 0) == 0.0, "Expected 0 failure rate for no runs")
+ _expect(module._failure_rate(1, 4) == 25.0, "Expected ratio failure rate")
+ _expect(module._percentile([], 0.95) == 0.0, "Expected empty percentile fallback")
+ _expect(module._percentile([10, 20, 30], 0) == 10.0, "Expected p0 percentile")
+ _expect(module._percentile([10, 20, 30], 1) == 30.0, "Expected p1 percentile")
+
+ # Duration helper
+ run_ok = {
+ "created_at": "2026-03-02T10:00:00Z",
+ "run_started_at": "2026-03-02T10:00:00Z",
+ "updated_at": "2026-03-02T10:05:00Z",
+ }
+ _expect(module._run_duration_seconds(run_ok) == 300.0, "Expected run duration computation")
+ run_bad = {
+ "run_started_at": "2026-03-02T10:05:00Z",
+ "updated_at": "2026-03-02T10:00:00Z",
+ }
+ _expect(module._run_duration_seconds(run_bad) is None, "Expected invalid backwards duration to be None")
+
+ # Required-check extraction
+ workflow_runs = [
+ {"head_branch": "main", "name": "CI"},
+ {"head_branch": "main", "name": "CodeQL"},
+ {"head_branch": "main", "name": "CI"},
+ ]
+ explicit_policy = {"required_checks": ["CI", "CI", "", "CodeQL"]}
+ _expect(module._required_checks(explicit_policy, workflow_runs) == ["CI", "CodeQL"], "Expected deduped explicit checks")
+ _expect(module._required_checks({}, workflow_runs) == ["CI", "CodeQL"], "Expected discovered checks fallback")
+
+ pass_rate, top_failed = module._required_check_metrics(
+ [
+ {"name": "CI", "conclusion": "success"},
+ {"name": "CI", "conclusion": "failure"},
+ {"name": "CodeQL", "conclusion": "neutral"},
+ {"name": "CodeQL", "conclusion": "cancelled"},
+ ],
+ ["CI", "CodeQL"],
+ )
+ _expect(pass_rate == 25.0, "Expected required-check pass-rate computation")
+ _expect(top_failed and top_failed[0]["name"] in {"CI", "CodeQL"}, "Expected top failed checks list")
+
+ # Deep merge and policy load paths
+ base = {"a": {"x": 1}, "b": 2}
+ merged = module._deep_merge(base, {"a": {"y": 3}, "c": 4})
+ _expect(merged == {"a": {"x": 1, "y": 3}, "b": 2, "c": 4}, "Expected deep merge semantics")
+
+ policy_path = tmp_path / "ops-policy.json"
+ policy_path.write_text(json.dumps({"required_checks": ["CI"], "thresholds": {"main_ci_failure_rate_pct": {"ok_max": 1.0}}}), encoding="utf-8")
+ loaded_policy, loaded = module._load_policy(policy_path)
+ _expect(loaded is True, "Expected policy loaded flag")
+ _expect(loaded_policy["required_checks"] == ["CI"], "Expected loaded required checks")
+
+ # Safe path helper
+ root = tmp_path / "workspace"
+ root.mkdir(parents=True, exist_ok=True)
+ safe = module._safe_workspace_path("docs/out.json", base=root)
+ _expect(safe == root / "docs" / "out.json", "Expected relative output path under workspace")
+ with pytest.raises(ValueError):
+ module._safe_workspace_path("../escape.json", base=root)
+
+
+def test_generate_ops_digest_main_paths(monkeypatch, tmp_path):
+ module = _load_script(_repo_root() / "scripts" / "generate_ops_digest.py", "generate_ops_digest_main_cov_wave")
+
+ repo = tmp_path / "repo"
+ repo.mkdir(parents=True, exist_ok=True)
+ (repo / "docs").mkdir(parents=True, exist_ok=True)
+
+ out_json = repo / "tmp" / "digest.json"
+ out_md = repo / "tmp" / "digest.md"
+ policy = repo / "docs" / "ops-health-policy.json"
+ policy.write_text(json.dumps({"required_checks": ["CI"]}), encoding="utf-8")
+
+ # Missing token path
+ monkeypatch.delenv("GITHUB_TOKEN", raising=False)
+ monkeypatch.delenv("GH_TOKEN", raising=False)
+ def _parse_args_missing_token():
+ return type(
+ "Args",
+ (),
+ {
+ "repo": "Prekzursil/Reframe",
+ "window_days": 7,
+ "out_json": str(out_json.relative_to(repo)),
+ "out_md": str(out_md.relative_to(repo)),
+ "policy": str(policy.relative_to(repo)),
+ "api_base": "https://api.github.com",
+ },
+ )()
+
+ monkeypatch.setattr(module, "parse_args", _parse_args_missing_token)
+
+ prev = Path.cwd()
+ os.chdir(repo)
+ try:
+ with pytest.raises(SystemExit):
+ module.main()
+ finally:
+ os.chdir(prev)
+
+ # Successful run path with fake pagination
+ monkeypatch.setenv("GITHUB_TOKEN", "token")
+
+ pulls = [{"created_at": "2026-03-03T00:00:00Z", "merged_at": "2026-03-03T00:00:00Z"}]
+ issues = [{"labels": [{"name": "agent:ready"}]}]
+ runs = [
+ {
+ "head_branch": "main",
+ "name": "CI",
+ "created_at": "2026-03-03T01:00:00Z",
+ "run_started_at": "2026-03-03T01:00:00Z",
+ "updated_at": "2026-03-03T01:10:00Z",
+ "conclusion": "success",
+ }
+ ]
+ seq = [pulls, issues, {"workflow_runs": runs}]
+ monkeypatch.setattr(module, "_request_json", lambda _url, _token: (seq.pop(0), None))
+
+ prev = Path.cwd()
+ os.chdir(repo)
+ try:
+ rc = module.main()
+ finally:
+ os.chdir(prev)
+
+ _expect(rc == 0, "Expected digest main success")
+ payload = json.loads(out_json.read_text(encoding="utf-8"))
+ _expect(payload["metrics"]["main_ci_failed_runs"] == 0, "Expected successful CI metrics")
+ _expect("Weekly Ops Digest" in out_md.read_text(encoding="utf-8"), "Expected markdown output")
+
+
+def test_assert_coverage_inventory_and_cli_paths(tmp_path, monkeypatch, capsys):
+ module = _load_script(_repo_root() / "scripts" / "quality" / "assert_coverage_100.py", "assert_coverage_cov_wave")
+
+ root = tmp_path / "repo"
+ root.mkdir(parents=True, exist_ok=True)
+
+ # Build tracked inventory files.
+ api_file = root / "apps" / "api" / "app" / "core.py"
+ api_file.parent.mkdir(parents=True, exist_ok=True)
+ api_file.write_text("VALUE = 1\n", encoding="utf-8")
+
+ web_file = root / "apps" / "web" / "src" / "ui.ts"
+ web_file.parent.mkdir(parents=True, exist_ok=True)
+ web_file.write_text("export const VALUE = 1;\n", encoding="utf-8")
+
+ rust_file = root / "apps" / "desktop" / "src-tauri" / "src" / "core.rs"
+ rust_file.parent.mkdir(parents=True, exist_ok=True)
+ rust_file.write_text("pub fn f() {}\n", encoding="utf-8")
+
+ monkeypatch.setattr(
+ module,
+ "_load_git_tracked_files",
+ lambda _root: [
+ "apps/api/app/core.py",
+ "apps/web/src/ui.ts",
+ "apps/desktop/src-tauri/src/core.rs",
+ ],
+ )
+
+ expected = module._collect_expected_inventory(root)
+ _expect("apps/api/app/core.py" in expected, "Expected API file in inventory")
+ _expect("apps/web/src/ui.ts" in expected, "Expected web file in inventory")
+ _expect("apps/desktop/src-tauri/src/core.rs" in expected, "Expected rust file in inventory")
+
+ # Provide LCOV with one uncovered line to verify findings formatting.
+ lcov = root / "coverage" / "lcov.info"
+ lcov.parent.mkdir(parents=True, exist_ok=True)
+ lcov.write_text(
+ "\n".join(
+ [
+ "TN:",
+ f"SF:{web_file.as_posix()}",
+ "DA:1,1",
+ "DA:2,0",
+ "end_of_record",
+ ]
+ ),
+ encoding="utf-8",
+ )
+
+ stats = module.parse_lcov("web", lcov, base=root)
+ status, findings, metrics = module.evaluate([stats], expected_inventory=expected)
+ _expect(status == "fail", "Expected fail status for uncovered inventory")
+ _expect(metrics["uncovered_files"] >= 1, "Expected uncovered file metric")
+ _expect(any("coverage inventory" in item for item in findings), "Expected inventory findings")
+
+ # Cover CLI success path with --no-inventory-check.
+ json_out = root / "out" / "coverage.json"
+ md_out = root / "out" / "coverage.md"
+ rc = module.main.__wrapped__ if hasattr(module.main, "__wrapped__") else None
+ _expect(rc is None, "No wrapper expected")
+
+ def _parse_args_no_inventory():
+ return type(
+ "Args",
+ (),
+ {
+ "xml": [],
+ "lcov": [f"web={lcov}"],
+ "out_json": str(json_out),
+ "out_md": str(md_out),
+ "inventory_root": str(root),
+ "no_inventory_check": True,
+ },
+ )()
+
+ monkeypatch.setattr(module, "_parse_args", _parse_args_no_inventory)
+ exit_code = module.main()
+ _expect(exit_code == 1, "Expected fail exit code when coverage is below 100")
+ _expect(json_out.is_file(), "Expected JSON artifact output")
+ _expect(md_out.is_file(), "Expected markdown artifact output")
+
+ text = capsys.readouterr().out
+ _expect("Coverage 100 Gate" in text, "Expected CLI markdown output")
+
+
+def test_assert_coverage_path_helpers_and_named_path_parsing(tmp_path):
+ module = _load_script(_repo_root() / "scripts" / "quality" / "assert_coverage_100.py", "assert_cov_helpers_wave")
+
+ with pytest.raises(ValueError):
+ module.parse_named_path("invalid")
+
+ name, path = module.parse_named_path("web=coverage/lcov.info")
+ _expect(name == "web", "Expected parsed name")
+ _expect(path.as_posix() == "coverage/lcov.info", "Expected parsed path")
+
+ root = tmp_path / "workspace"
+ root.mkdir(parents=True, exist_ok=True)
+ safe = module._safe_output_path("coverage/out.json", "fallback.json", base=root)
+ _expect(safe == root / "coverage" / "out.json", "Expected safe path in workspace")
+
+ with pytest.raises(ValueError):
+ module._safe_output_path("../escape.json", "fallback.json", base=root)
\ No newline at end of file
diff --git a/apps/api/tests/test_scripts_coverage_wave.py b/apps/api/tests/test_scripts_coverage_wave.py
new file mode 100644
index 00000000..b3fa7930
--- /dev/null
+++ b/apps/api/tests/test_scripts_coverage_wave.py
@@ -0,0 +1,274 @@
+from __future__ import annotations
+
+import argparse
+import json
+import sys
+import types
+from dataclasses import dataclass
+from enum import Enum
+from importlib.util import module_from_spec, spec_from_file_location
+from pathlib import Path
+
+import pytest
+
+
+def _expect(condition: bool, message: str) -> None:
+ if not condition:
+ raise AssertionError(message)
+
+
+def _repo_root() -> Path:
+ return Path(__file__).resolve().parents[3]
+
+
+def _load_script(name: str):
+ scripts_dir = _repo_root() / "scripts"
+ if str(scripts_dir) not in sys.path:
+ sys.path.insert(0, str(scripts_dir))
+ module_path = scripts_dir / f"{name}.py"
+ spec = spec_from_file_location(name, module_path)
+ _expect(spec is not None and spec.loader is not None, f"Unable to load module spec for {name}")
+ module = module_from_spec(spec)
+ sys.modules[spec.name] = module
+ spec.loader.exec_module(module)
+ return module
+
+
+def test_release_readiness_helpers_and_safe_paths(tmp_path):
+ module = _load_script("release_readiness_report")
+
+ root = tmp_path / "repo"
+ root.mkdir(parents=True, exist_ok=True)
+
+ safe = module._safe_output_path("docs/out.json", root / "fallback.json", root=root)
+ _expect(safe == (root / "docs" / "out.json"), "Expected relative path to resolve under repo root")
+
+ with pytest.raises(ValueError):
+ module._safe_output_path("../escape.json", root / "fallback.json", root=root)
+
+ rel = module._display_path(root / "docs" / "out.json", root)
+ _expect(rel.replace("\\", "/") == "docs/out.json", "Expected display path relative to repo root")
+
+
+def test_release_readiness_resolve_status_matrix():
+ module = _load_script("release_readiness_report")
+
+ status, blocking, external = module._resolve_status(local_ok=True, updater_ok=True, pyannote_cpu_status="ok")
+ _expect(status == "READY", "Expected READY when all gates are green")
+ _expect(blocking == [], "Expected no blocking reasons")
+ _expect(external == [], "Expected no external blockers")
+
+ status, blocking, external = module._resolve_status(
+ local_ok=True,
+ updater_ok=True,
+ pyannote_cpu_status="blocked_external",
+ )
+ _expect(status == "READY_WITH_EXTERNAL_BLOCKER", "Expected external-blocker readiness status")
+ _expect(blocking == [], "Expected no blocking reasons for external-only blocker")
+ _expect(len(external) == 1, "Expected one external blocker detail")
+
+ status, blocking, _external = module._resolve_status(local_ok=False, updater_ok=False, pyannote_cpu_status="failed")
+ _expect(status == "NOT_READY", "Expected NOT_READY for failed local/updater/pyannote")
+ _expect(len(blocking) == 3, "Expected three blocking reasons")
+
+
+def test_release_readiness_main_ready_with_external_blocker(monkeypatch):
+ module = _load_script("release_readiness_report")
+
+ stamp = "2099-01-01"
+ out_md = "tmp/release-readiness-wave/report.md"
+ out_json = "tmp/release-readiness-wave/report.json"
+
+ def fake_load_json(path: Path):
+ text = str(path).replace("\\", "/")
+ if text.endswith(f"{stamp}-updater-e2e-windows.json"):
+ return {"success": True, "platform": "windows"}
+ if text.endswith(f"{stamp}-updater-e2e-macos.json"):
+ return {"success": True, "platform": "macos"}
+ if text.endswith(f"{stamp}-updater-e2e-linux.json"):
+ return {"success": True, "platform": "linux"}
+ if text.endswith(f"{stamp}-pyannote-benchmark-status.json"):
+ return {"cpu": {"status": "blocked_external"}, "gpu": {"status": "unknown"}}
+ return None
+
+ monkeypatch.setattr(module, "_load_json", fake_load_json)
+ monkeypatch.setattr(module, "_load_latest_updater_result", lambda _plans, _platform: (None, None))
+ monkeypatch.setattr(module, "_collect_gh_status", lambda _repo: {"ci": {"conclusion": "success"}, "codeql": {"conclusion": "success"}, "branch_protection": {"required_reviews": 1, "linear_history": True}})
+
+ rc = module.main(
+ [
+ "--stamp",
+ stamp,
+ "--verify-exit",
+ "0",
+ "--smoke-hosted-exit",
+ "0",
+ "--smoke-local-exit",
+ "0",
+ "--smoke-security-exit",
+ "0",
+ "--smoke-workflows-exit",
+ "0",
+ "--smoke-perf-cost-exit",
+ "0",
+ "--diarization-exit",
+ "0",
+ "--out-md",
+ out_md,
+ "--out-json",
+ out_json,
+ ]
+ )
+
+ _expect(rc == 0, "Expected READY_WITH_EXTERNAL_BLOCKER to be non-failing")
+ repo = _repo_root()
+ payload = json.loads((repo / out_json).read_text(encoding="utf-8"))
+ _expect(payload["status"] == "READY_WITH_EXTERNAL_BLOCKER", "Expected external blocker status in summary")
+ _expect(payload.get("external_blocker_tracking", {}).get("issue_url"), "Expected external blocker tracking metadata")
+
+
+def test_upsert_ops_digest_main_create_and_update(monkeypatch, tmp_path):
+ module = _load_script("upsert_ops_digest_issue")
+
+ repo = _repo_root()
+ digest_json = repo / "tmp" / "ops-digest" / "digest.json"
+ digest_md = repo / "tmp" / "ops-digest" / "digest.md"
+ out_json = repo / "tmp" / "ops-digest" / "out.json"
+ digest_json.parent.mkdir(parents=True, exist_ok=True)
+ digest_json.write_text(json.dumps({"metrics": {}, "trends": {}, "health": {}}), encoding="utf-8")
+ digest_md.write_text("# digest\n", encoding="utf-8")
+
+ monkeypatch.setenv("GITHUB_TOKEN", "token")
+
+ args = argparse.Namespace(
+ repo="Prekzursil/Reframe",
+ digest_json=str(digest_json.relative_to(repo)),
+ digest_md=str(digest_md.relative_to(repo)),
+ out_json=str(out_json.relative_to(repo)),
+ title="Weekly Ops Digest (rolling)",
+ )
+ monkeypatch.setattr(module, "parse_args", lambda: args)
+
+ calls = {"mode": "create", "posts": 0, "patches": 0}
+
+ def fake_request(path: str, token: str, method: str = "GET", body=None):
+ _ = token
+ if method == "GET" and path.startswith("/repos/Prekzursil/Reframe/issues?"):
+ return [] if calls["mode"] == "create" else [{"number": 88, "title": "Weekly Ops Digest (rolling)", "html_url": "https://example.test/88"}]
+ if method == "POST":
+ calls["posts"] += 1
+ return {"number": 88, "html_url": "https://example.test/88"}
+ if method == "PATCH":
+ calls["patches"] += 1
+ return {"number": 88, "html_url": "https://example.test/88"}
+ raise AssertionError(f"Unexpected request: {method} {path} body={body!r}")
+
+ monkeypatch.setattr(module, "_request_json", fake_request)
+
+ rc_create = module.main()
+ _expect(rc_create == 0, "Expected create flow to succeed")
+ _expect(calls["posts"] == 1, "Expected one POST for create flow")
+
+ calls["mode"] = "update"
+ rc_update = module.main()
+ _expect(rc_update == 0, "Expected update flow to succeed")
+ _expect(calls["patches"] == 1, "Expected one PATCH for update flow")
+
+
+def test_benchmark_diarization_extract_and_main_paths(monkeypatch, tmp_path, capsys):
+ module = _load_script("benchmark_diarization")
+
+ with pytest.raises(FileNotFoundError):
+ monkeypatch.setattr(module.shutil, "which", lambda _name: None)
+ module._extract_wav_16k_mono(tmp_path / "in.wav", tmp_path / "out.wav")
+
+ recorded = {}
+ monkeypatch.setattr(module.shutil, "which", lambda _name: "ffmpeg")
+ monkeypatch.setattr(module.subprocess, "run", lambda cmd, check, capture_output, shell: recorded.setdefault("cmd", cmd))
+ module._extract_wav_16k_mono(tmp_path / "in.wav", tmp_path / "out.wav")
+ _expect(recorded["cmd"][0] == "ffmpeg", "Expected ffmpeg command execution")
+
+ fake_path_guard = types.ModuleType("media_core.transcribe.path_guard")
+ def _validate_media_input_path(value):
+ return Path(value)
+
+ fake_path_guard.validate_media_input_path = _validate_media_input_path
+
+ class _Backend(Enum):
+ PYANNOTE = "pyannote"
+ SPEECHBRAIN = "speechbrain"
+
+ @dataclass
+ class _Config:
+ backend: _Backend
+ model: str
+ huggingface_token: str | None
+ min_segment_duration: float
+
+ fake_diarize = types.ModuleType("media_core.diarize")
+ fake_diarize.DiarizationBackend = _Backend
+ fake_diarize.DiarizationConfig = _Config
+ fake_diarize.diarize_audio = lambda _wav, _cfg: ["s1", "s2"]
+
+ monkeypatch.setitem(sys.modules, "media_core.transcribe.path_guard", fake_path_guard)
+ monkeypatch.setitem(sys.modules, "media_core.diarize", fake_diarize)
+
+ input_file = tmp_path / "sample.wav"
+ input_file.write_bytes(b"wav")
+ monkeypatch.setattr(module, "_extract_wav_16k_mono", lambda _inp, out: out.write_bytes(b"wav16"))
+ monkeypatch.setattr(module, "_get_peak_rss_mb", lambda: 123.4)
+
+ rc_blocked = module.main([
+ str(input_file),
+ "--backend",
+ "pyannote",
+ "--model",
+ "pyannote/speaker-diarization-3.1",
+ ])
+ _expect(rc_blocked == 2, "Expected missing HF token path to return 2")
+
+ rc_ok = module.main([
+ str(input_file),
+ "--backend",
+ "speechbrain",
+ "--runs",
+ "2",
+ "--format",
+ "md",
+ ])
+ _expect(rc_ok == 0, "Expected benchmark main success for speechbrain backend")
+ _expect("Diarization benchmark" in capsys.readouterr().out, "Expected markdown output")
+
+
+def test_transcribe_main_module_paths(monkeypatch, tmp_path, capsys):
+ module_path = _repo_root() / "packages" / "media-core" / "src" / "media_core" / "transcribe" / "__main__.py"
+ spec = spec_from_file_location("media_core.transcribe.__main__", module_path)
+ _expect(spec is not None and spec.loader is not None, "Expected __main__ module spec")
+ module = module_from_spec(spec)
+ sys.modules[spec.name] = module
+ spec.loader.exec_module(module)
+
+ media_file = tmp_path / "audio.wav"
+ media_file.write_bytes(b"audio")
+
+ monkeypatch.setattr(module, "parse_args", lambda: argparse.Namespace(input=str(media_file), language="en", backend="noop", model="whisper-1", device="cpu"))
+ monkeypatch.setattr(module, "validate_media_input_path", lambda _p: media_file)
+
+ class _Result:
+ def model_dump(self):
+ return {"text": "ok", "words": []}
+
+ monkeypatch.setattr(module, "transcribe_noop", lambda _path, _cfg: _Result())
+ _expect(module.main() == 0, "Expected noop backend path to pass")
+
+ monkeypatch.setattr(module, "parse_args", lambda: argparse.Namespace(input=str(media_file), language=None, backend="invalid", model="m", device=None))
+ _expect(module.main() == 1, "Expected invalid backend to fail")
+
+ monkeypatch.setattr(module, "parse_args", lambda: argparse.Namespace(input=str(media_file), language="en", backend="noop", model="m", device=None))
+ monkeypatch.setattr(module, "validate_media_input_path", lambda _p: (_ for _ in ()).throw(ValueError("bad path")))
+ _expect(module.main() == 1, "Expected invalid input path to fail")
+
+ monkeypatch.setattr(module, "validate_media_input_path", lambda _p: media_file)
+ monkeypatch.setattr(module, "transcribe_noop", lambda _path, _cfg: (_ for _ in ()).throw(RuntimeError("boom")))
+ _expect(module.main() == 1, "Expected transcription exception path to fail")
+ _expect("Tip: use backend 'noop'" in capsys.readouterr().err, "Expected offline tip on transcription failure")
diff --git a/apps/api/tests/test_scripts_misc_tooling.py b/apps/api/tests/test_scripts_misc_tooling.py
new file mode 100644
index 00000000..dda7a87b
--- /dev/null
+++ b/apps/api/tests/test_scripts_misc_tooling.py
@@ -0,0 +1,255 @@
+from __future__ import annotations
+
+import argparse
+import json
+import sys
+import types
+from dataclasses import dataclass
+from importlib.util import module_from_spec, spec_from_file_location
+from pathlib import Path
+
+
+def _expect(condition: bool, message: str) -> None:
+ if not condition:
+ raise AssertionError(message)
+
+
+def _repo_root() -> Path:
+ return Path(__file__).resolve().parents[3]
+
+
+def _load_script(name: str):
+ scripts_dir = _repo_root() / "scripts"
+ if str(scripts_dir) not in sys.path:
+ sys.path.insert(0, str(scripts_dir))
+ module_path = scripts_dir / f"{name}.py"
+ spec = spec_from_file_location(name, module_path)
+ _expect(spec is not None and spec.loader is not None, f"Unable to load module spec for {name}")
+ module = module_from_spec(spec)
+ sys.modules[spec.name] = module
+ spec.loader.exec_module(module)
+ return module
+
+
+def test_prefetch_whisper_model_missing_dependency(capsys):
+ module = _load_script("prefetch_whisper_model")
+
+ rc = module.main(["--model", "large-v3"])
+
+ _expect(rc == 2, "Expected missing faster-whisper dependency to return 2")
+ _expect("faster-whisper is not installed" in capsys.readouterr().err, "Expected dependency error message")
+
+
+def test_prefetch_whisper_model_success(monkeypatch, capsys):
+ module = _load_script("prefetch_whisper_model")
+
+ calls: list[tuple[str, dict[str, str]]] = []
+
+ class FakeWhisperModel:
+ def __init__(self, model_name: str, **kwargs):
+ calls.append((model_name, kwargs))
+
+ fake_backend = types.ModuleType("media_core.transcribe.backends.faster_whisper")
+ fake_backend._normalize_model_name = lambda value: f"normalized-{value}"
+
+ fake_fw = types.ModuleType("faster_whisper")
+ fake_fw.WhisperModel = FakeWhisperModel
+
+ monkeypatch.setitem(sys.modules, "media_core.transcribe.backends.faster_whisper", fake_backend)
+ monkeypatch.setitem(sys.modules, "faster_whisper", fake_fw)
+
+ rc = module.main(["--model", "large-v3", "--device", "cpu"])
+
+ _expect(rc == 0, "Expected successful prefetch")
+ _expect(calls == [("normalized-large-v3", {"device": "cpu"})], "Expected normalized model and device kwargs")
+ _expect("Prefetching faster-whisper model" in capsys.readouterr().out, "Expected prefetch output")
+
+
+def test_install_argos_pack_paths(monkeypatch, capsys):
+ module = _load_script("install_argos_pack")
+
+ class FakePackage:
+ def __init__(self, src: str, tgt: str):
+ self.from_code = src
+ self.to_code = tgt
+
+ def download(self):
+ return "/tmp/fake.argosmodel"
+
+ class FakeArgos:
+ def __init__(self):
+ self.updated = False
+ self.installed_path = ""
+
+ def update_package_index(self):
+ self.updated = True
+
+ def get_available_packages(self):
+ return [FakePackage("en", "es"), FakePackage("en", "fr")]
+
+ def install_from_path(self, path: str):
+ self.installed_path = path
+
+ fake_argos = FakeArgos()
+ monkeypatch.setattr(module, "_ensure_argos", lambda: fake_argos)
+
+ _expect(module.main(["--list"]) == 0, "Expected list flow to pass")
+ _expect("en->es" in capsys.readouterr().out, "Expected list output to include en->es")
+
+ _expect(module.main([]) == 2, "Expected missing src/tgt to fail")
+ _expect("--src and --tgt are required" in capsys.readouterr().err, "Expected src/tgt requirement message")
+
+ _expect(module.main(["--src", "en", "--tgt", "de"]) == 3, "Expected unavailable pair to fail")
+ _expect("No Argos pack found for en->de" in capsys.readouterr().err, "Expected no-pack message")
+
+ _expect(module.main(["--src", "en", "--tgt", "es"]) == 0, "Expected install flow to pass")
+ _expect(fake_argos.installed_path == "/tmp/fake.argosmodel", "Expected install_from_path invocation")
+
+
+def test_generate_benchmark_sample_main_and_path_guard(monkeypatch, tmp_path):
+ module = _load_script("generate_benchmark_sample")
+
+ _expect(module._sample_value(2.2) == 0.0, "Expected silent bucket sample to be zero")
+
+ repo = tmp_path / "repo"
+ repo.mkdir(parents=True, exist_ok=True)
+ try:
+ module._safe_output_path("../escape.wav", base=repo)
+ raise AssertionError("Expected ValueError for escaping output path")
+ except ValueError:
+ pass
+
+ out_wav = tmp_path / "sample.wav"
+ args = argparse.Namespace(out="samples/sample.wav", duration=0.02, sample_rate=8000)
+ monkeypatch.setattr(module.argparse.ArgumentParser, "parse_args", lambda _self: args)
+ monkeypatch.setattr(module, "_safe_output_path", lambda *_args, **_kwargs: out_wav)
+
+ rc = module.main()
+
+ _expect(rc == 0, "Expected benchmark sample generation to succeed")
+ _expect(out_wav.is_file(), "Expected WAV output file to exist")
+ _expect(out_wav.stat().st_size > 44, "Expected WAV file with audio payload")
+
+
+def test_download_whispercpp_model_behaviors(monkeypatch, tmp_path, capsys):
+ module = _load_script("download_whispercpp_model")
+
+ _expect(module._normalize_filename("large-v3") == "ggml-large-v3.bin", "Expected normalized ggml filename")
+ _expect(module._normalize_filename("ggml-base.en.bin") == "ggml-base.en.bin", "Expected pre-prefixed filename")
+
+ try:
+ module._normalize_filename("bad*name")
+ raise AssertionError("Expected invalid filename to fail")
+ except ValueError:
+ pass
+
+ out_dir = tmp_path / "models"
+ out_dir.mkdir(parents=True, exist_ok=True)
+ existing = out_dir / "ggml-large-v3.bin"
+ existing.write_text("ready", encoding="utf-8")
+
+ monkeypatch.setattr(module, "_resolve_output_dir", lambda *_args, **_kwargs: out_dir)
+ rc_existing = module.main(["--model", "large-v3"])
+ _expect(rc_existing == 0, "Expected existing file fast-path")
+ _expect("Already present" in capsys.readouterr().out, "Expected already-present message")
+
+ rc_bad_url = module.main(["--base-url", "http://example.com"])
+ _expect(rc_bad_url == 2, "Expected non-https base URL to fail")
+
+ downloaded = out_dir / "ggml-small.bin"
+ monkeypatch.setattr(module, "_download", lambda _url, dest: dest.write_text("model", encoding="utf-8"))
+ rc_download = module.main(["--model", "small", "--force"])
+ _expect(rc_download == 0, "Expected download path to succeed")
+ _expect(downloaded.is_file(), "Expected downloaded model file")
+
+
+def test_verify_desktop_updater_release_main_paths(monkeypatch, capsys):
+ module = _load_script("verify_desktop_updater_release")
+
+ payload = {
+ "version": "0.1.8",
+ "pub_date": "2026-03-03T00:00:00Z",
+ "platforms": {
+ "windows-x86_64": {
+ "url": "https://example.com/app.exe",
+ "signature": "A" * 40,
+ }
+ },
+ }
+ monkeypatch.setattr(module, "_fetch_bytes", lambda _url: json.dumps(payload).encode("utf-8"))
+ monkeypatch.setattr(module, "_head_with_retries", lambda _url: 200)
+
+ rc_ok = module.main(["--endpoint", "https://example.com/latest.json"])
+ _expect(rc_ok == 0, "Expected updater release verification to pass")
+ _expect("OK: updater JSON looks valid" in capsys.readouterr().out, "Expected success output")
+
+ monkeypatch.setattr(module, "_head_with_retries", lambda _url: 404)
+ rc_fail = module.main(["--endpoint", "https://example.com/latest.json"])
+ _expect(rc_fail == 1, "Expected inaccessible platform URL to fail")
+
+
+def test_verify_hf_model_access_paths(monkeypatch, tmp_path):
+ module = _load_script("verify_hf_model_access")
+
+ dotenv_repo = tmp_path / "repo"
+ dotenv_repo.mkdir(parents=True, exist_ok=True)
+ (dotenv_repo / ".env").write_text("HF_TOKEN=token-from-env-file\n", encoding="utf-8")
+
+ token = module._load_token("", dotenv_repo)
+ _expect(token == "token-from-env-file", "Expected token lookup from .env")
+
+ missing = module._probe("https://huggingface.co/x/resolve/main/config.yaml", "", model="x")
+ _expect(missing.status == "missing_token", "Expected missing-token probe state")
+
+ @dataclass
+ class _FakeResult:
+ timestamp_utc: str
+ status: str
+ model: str
+ url: str
+ http_status: int | None
+ error: str | None
+
+ monkeypatch.setattr(module, "_probe", lambda _url, _token, model: _FakeResult("ts", "ok", model, _url, 200, None))
+ rc_ok = module.main(["--token", "abc", "--model", "pyannote/speaker-diarization-3.1"])
+ _expect(rc_ok == 0, "Expected hf probe main success")
+
+ monkeypatch.setattr(module, "_probe", lambda _url, _token, model: _FakeResult("ts", "blocked_403", model, _url, 403, "blocked"))
+ rc_blocked = module.main(["--token", "abc", "--model", "pyannote/speaker-diarization-3.1"])
+ _expect(rc_blocked == 4, "Expected blocked status exit code")
+
+
+def test_desktop_updater_e2e_paths(monkeypatch, tmp_path):
+ module = _load_script("desktop_updater_e2e")
+
+ repo = tmp_path / "repo"
+ (repo / "scripts").mkdir(parents=True, exist_ok=True)
+ monkeypatch.setattr(module, "_repo_root", lambda: repo)
+
+ verify_failure = module.subprocess.CompletedProcess(args=["verify"], returncode=1, stdout="", stderr="err")
+
+ def run_fail(cmd, *, cwd, env=None):
+ _ = (cmd, cwd, env)
+ return verify_failure
+
+ monkeypatch.setattr(module, "_run", run_fail)
+ rc_fail = module.main(["--platform", "linux"])
+ _expect(rc_fail == 1, "Expected verify failure to fail wrapper")
+
+ verify_ok = module.subprocess.CompletedProcess(args=["verify"], returncode=0, stdout="ok", stderr="")
+ helper_ok = module.subprocess.CompletedProcess(
+ args=["helper"],
+ returncode=0,
+ stdout=json.dumps({"success": True, "observed_old_version": "0.1.6", "observed_new_version": "0.1.7"}),
+ stderr="",
+ )
+ calls = {"count": 0}
+
+ def run_success(cmd, *, cwd, env=None):
+ _ = (cmd, cwd, env)
+ calls["count"] += 1
+ return verify_ok if calls["count"] == 1 else helper_ok
+
+ monkeypatch.setattr(module, "_run", run_success)
+ rc_ok = module.main(["--platform", "linux"])
+ _expect(rc_ok == 0, "Expected successful updater e2e wrapper")
diff --git a/apps/api/tests/test_scripts_quality_cli_wave2.py b/apps/api/tests/test_scripts_quality_cli_wave2.py
new file mode 100644
index 00000000..2668a337
--- /dev/null
+++ b/apps/api/tests/test_scripts_quality_cli_wave2.py
@@ -0,0 +1,360 @@
+from __future__ import annotations
+
+import argparse
+import json
+import sys
+from importlib.util import module_from_spec, spec_from_file_location
+from pathlib import Path
+
+import pytest
+
+
+def _expect(condition: bool, message: str) -> None:
+ if not condition:
+ raise AssertionError(message)
+
+
+def _repo_root() -> Path:
+ return Path(__file__).resolve().parents[3]
+
+
+def _load_quality(name: str):
+ script_path = _repo_root() / "scripts" / "quality" / f"{name}.py"
+ spec = spec_from_file_location(f"quality_{name}_wave2", script_path)
+ _expect(spec is not None and spec.loader is not None, f"Unable to load module spec for {name}")
+ module = module_from_spec(spec)
+ sys.modules[spec.name] = module
+ spec.loader.exec_module(module)
+ return module
+
+
+def _load_script(name: str):
+ script_path = _repo_root() / "scripts" / f"{name}.py"
+ spec = spec_from_file_location(f"script_{name}_wave2", script_path)
+ _expect(spec is not None and spec.loader is not None, f"Unable to load module spec for {name}")
+ module = module_from_spec(spec)
+ sys.modules[spec.name] = module
+ spec.loader.exec_module(module)
+ return module
+
+
+def test_check_quality_secrets_main_pass_and_fail(monkeypatch):
+ module = _load_quality("check_quality_secrets")
+ repo = _repo_root()
+
+ out_json_rel = "tmp/quality-wave2/check-quality-secrets.json"
+ out_md_rel = "tmp/quality-wave2/check-quality-secrets.md"
+ out_json = repo / out_json_rel
+ out_md = repo / out_md_rel
+
+ monkeypatch.setattr(
+ module,
+ "_parse_args",
+ lambda: argparse.Namespace(required_secret=[], required_var=[], out_json=out_json_rel, out_md=out_md_rel),
+ )
+
+ for name in module.DEFAULT_REQUIRED_SECRETS:
+ monkeypatch.setenv(name, "x")
+ for name in module.DEFAULT_REQUIRED_VARS:
+ monkeypatch.setenv(name, "x")
+
+ rc = module.main()
+ _expect(rc == 0, "Expected pass when all secrets/vars are set")
+ _expect(out_json.is_file(), "Expected JSON output file")
+ _expect(out_md.is_file(), "Expected markdown output file")
+
+ monkeypatch.delenv(module.DEFAULT_REQUIRED_SECRETS[0], raising=False)
+ rc_fail = module.main()
+ _expect(rc_fail == 1, "Expected fail when a required secret is missing")
+
+
+def test_check_quality_secrets_safe_output_path_escape():
+ module = _load_quality("check_quality_secrets")
+ with pytest.raises(ValueError):
+ module._safe_output_path("../escape.json", "fallback.json", base=Path.cwd())
+
+
+def test_check_required_checks_main_success_and_missing_token(monkeypatch):
+ module = _load_quality("check_required_checks")
+ repo = _repo_root()
+
+ out_json_rel = "tmp/quality-wave2/required-checks.json"
+ out_md_rel = "tmp/quality-wave2/required-checks.md"
+ out_json = repo / out_json_rel
+ out_md = repo / out_md_rel
+
+ calls = {"count": 0}
+
+ def fake_api_get(_repo: str, path: str, _token: str):
+ calls["count"] += 1
+ if "check-runs" in path:
+ if calls["count"] <= 2:
+ return {"check_runs": [{"name": "Coverage 100 Gate", "status": "in_progress", "conclusion": None}]}
+ return {"check_runs": [{"name": "Coverage 100 Gate", "status": "completed", "conclusion": "success"}]}
+ return {"statuses": []}
+
+ monkeypatch.setattr(module, "_api_get", fake_api_get)
+ monkeypatch.setattr(module.time, "sleep", lambda _s: None)
+ monkeypatch.setenv("GITHUB_TOKEN", "token")
+ monkeypatch.setattr(
+ module,
+ "_parse_args",
+ lambda: argparse.Namespace(
+ repo="Prekzursil/Reframe",
+ sha="abc123",
+ required_context=["Coverage 100 Gate"],
+ timeout_seconds=5,
+ poll_seconds=1,
+ out_json=out_json_rel,
+ out_md=out_md_rel,
+ ),
+ )
+
+ rc = module.main()
+ _expect(rc == 0, "Expected success after in-progress then successful check run")
+ _expect(out_json.is_file(), "Expected required-check JSON artifact")
+ _expect(out_md.is_file(), "Expected required-check markdown artifact")
+
+ monkeypatch.delenv("GITHUB_TOKEN", raising=False)
+ monkeypatch.delenv("GH_TOKEN", raising=False)
+ with pytest.raises(SystemExit):
+ module.main()
+
+
+def test_check_codacy_zero_main_paths(monkeypatch):
+ module = _load_quality("check_codacy_zero")
+ repo = _repo_root()
+
+ out_json_rel = "tmp/quality-wave2/codacy.json"
+ out_md_rel = "tmp/quality-wave2/codacy.md"
+ out_json = repo / out_json_rel
+
+ monkeypatch.delenv("CODACY_API_TOKEN", raising=False)
+ monkeypatch.setattr(
+ module,
+ "_parse_args",
+ lambda: argparse.Namespace(repo="Prekzursil/Reframe", pull_request="", out_json=out_json_rel, out_md=out_md_rel),
+ )
+ rc_missing = module.main()
+ _expect(rc_missing == 1, "Expected fail when CODACY_API_TOKEN is missing")
+
+ monkeypatch.setenv("CODACY_API_TOKEN", "token")
+
+ def fake_request(url: str, token: str, *, method: str = "GET", data=None):
+ _ = (url, token, method, data)
+ return {"pagination": {"total": 0}}
+
+ monkeypatch.setattr(module, "_request_json", fake_request)
+ rc_repo = module.main()
+ _expect(rc_repo == 0, "Expected pass when repository open issues == 0")
+ _expect(out_json.is_file(), "Expected codacy JSON output")
+
+ monkeypatch.setattr(
+ module,
+ "_parse_args",
+ lambda: argparse.Namespace(repo="Prekzursil/Reframe", pull_request="abc", out_json=out_json_rel, out_md=out_md_rel),
+ )
+ rc_invalid_pr = module.main()
+ _expect(rc_invalid_pr == 1, "Expected fail for invalid pull request number")
+
+
+def test_check_sonar_zero_main_wait_and_exception(monkeypatch):
+ module = _load_quality("check_sonar_zero")
+
+ repo = _repo_root()
+ out_json_rel = "tmp/quality-wave2/sonar.json"
+ out_md_rel = "tmp/quality-wave2/sonar.md"
+ _ = (repo / out_json_rel, repo / out_md_rel)
+
+ sequence = iter([(2, "ERROR"), (0, "OK")])
+
+ def fake_query(**_kwargs):
+ return next(sequence)
+
+ monkeypatch.setattr(module, "_query_sonar_status", fake_query)
+ monkeypatch.setattr(module.time, "sleep", lambda _s: None)
+ monkeypatch.setenv("SONAR_TOKEN", "token")
+ monkeypatch.setattr(
+ module,
+ "_parse_args",
+ lambda: argparse.Namespace(
+ project_key="Prekzursil_Reframe",
+ token="",
+ branch="",
+ pull_request="107",
+ wait_seconds=15,
+ require_quality_gate=True,
+ ignore_open_issues=False,
+ out_json=out_json_rel,
+ out_md=out_md_rel,
+ ),
+ )
+
+ rc_wait = module.main()
+ _expect(rc_wait == 0, "Expected Sonar pass after wait loop resolves to zero")
+
+ monkeypatch.setattr(module, "_query_sonar_status", lambda **_kwargs: (_ for _ in ()).throw(RuntimeError("boom")))
+ rc_exc = module.main()
+ _expect(rc_exc == 1, "Expected Sonar fail on query exception")
+
+
+def test_check_visual_zero_percy_and_applitools_paths(monkeypatch, tmp_path):
+ module = _load_quality("check_visual_zero")
+
+ monkeypatch.setenv("PERCY_TOKEN", "token")
+ monkeypatch.setenv("GITHUB_SHA", "abc1234")
+ monkeypatch.setattr(module, "_percy_request", lambda _path, _token, query=None: {"data": []})
+ clock = {"t": 0.0}
+ monkeypatch.setattr(module.time, "monotonic", lambda: clock.__setitem__("t", clock["t"] + 301.0) or clock["t"])
+ monkeypatch.setattr(module.time, "sleep", lambda _s: None)
+ status, details, findings = module._run_percy(argparse.Namespace(percy_token="", sha="", branch="main"))
+ _expect(status == "pass", "Expected pass when Percy build is unavailable")
+ _expect(details.get("lookup_mode") == "unavailable", "Expected unavailable lookup mode")
+ _expect(findings, "Expected informational finding")
+
+ monkeypatch.setattr(
+ module,
+ "_percy_request",
+ lambda _path, _token, query=None: {
+ "data": [
+ {
+ "id": "1",
+ "attributes": {
+ "created-at": "2026-03-04T00:00:00Z",
+ "review-state": "unreviewed",
+ "total-comparisons-diff": 2,
+ },
+ }
+ ]
+ },
+ )
+ monkeypatch.setattr(module.time, "monotonic", lambda: 0.0)
+ status_fail, _details_fail, findings_fail = module._run_percy(argparse.Namespace(percy_token="", sha="abc1234", branch="main"))
+ _expect(status_fail == "fail", "Expected fail for unresolved Percy diffs")
+ _expect(any("unresolved visual diffs" in item for item in findings_fail), "Expected unresolved diff finding")
+
+ missing_status, _missing_details, _missing_findings = module._run_applitools(
+ argparse.Namespace(applitools_results="", provider="applitools")
+ )
+ _expect(missing_status == "fail", "Expected fail when applitools results path is missing")
+
+ results_path = _repo_root() / "tmp" / "quality-wave2" / "applitools.json"
+ results_path.parent.mkdir(parents=True, exist_ok=True)
+ results_path.write_text(json.dumps({"unresolved": 0, "mismatches": 0, "missing": 0}), encoding="utf-8")
+ ok_status, _ok_details, ok_findings = module._run_applitools(
+ argparse.Namespace(applitools_results="tmp/quality-wave2/applitools.json", provider="applitools")
+ )
+ _expect(ok_status == "pass", "Expected pass when applitools metrics are zero")
+ _expect(ok_findings == [], "Expected no findings for zero applitools metrics")
+
+
+def test_percy_auto_approve_main_paths(monkeypatch, capsys):
+ module = _load_quality("percy_auto_approve")
+
+ monkeypatch.delenv("PERCY_TOKEN", raising=False)
+ rc_missing = module.main(["--sha", "abc1234"])
+ _expect(rc_missing == 1, "Expected missing token failure")
+
+ monkeypatch.setenv("PERCY_TOKEN", "token")
+ rc_bad_sha = module.main(["--sha", "not-sha"])
+ _expect(rc_bad_sha == 1, "Expected invalid SHA failure")
+
+ monkeypatch.setattr(module, "_query_builds", lambda **_kwargs: {"data": []})
+ rc_no_build = module.main(["--sha", "abc1234", "--retry-attempts", "1"])
+ _expect(rc_no_build == 0, "Expected no-build path to be informational success")
+
+ requested = {"approved": False}
+
+ def fake_request_json(**kwargs):
+ if kwargs.get("method") == "POST" and kwargs.get("path") == "/reviews":
+ requested["approved"] = True
+ return {"ok": True}
+ return {}
+
+ monkeypatch.setattr(
+ module,
+ "_query_builds",
+ lambda **_kwargs: {
+ "data": [
+ {
+ "id": "build-1",
+ "attributes": {
+ "created-at": "2026-03-04T00:00:00Z",
+ "state": "finished",
+ "review-state": "unreviewed",
+ },
+ }
+ ]
+ },
+ )
+ monkeypatch.setattr(module, "_request_json", fake_request_json)
+
+ rc_approve = module.main(["--sha", "abc1234", "--retry-attempts", "1"])
+ _expect(rc_approve == 0, "Expected build approval path success")
+ _expect(requested["approved"], "Expected Percy review approval POST")
+ _expect("approved=true" in capsys.readouterr().out, "Expected approved output marker")
+
+
+def test_upsert_ops_digest_main_error_paths(monkeypatch):
+ module = _load_script("upsert_ops_digest_issue")
+ repo = _repo_root()
+
+ digest_json_rel = "tmp/quality-wave2/digest.json"
+ digest_md_rel = "tmp/quality-wave2/digest.md"
+ out_json_rel = "tmp/quality-wave2/digest-out.json"
+
+ digest_json = repo / digest_json_rel
+ digest_md = repo / digest_md_rel
+ digest_json.parent.mkdir(parents=True, exist_ok=True)
+ digest_json.write_text(json.dumps({"metrics": {}, "trends": {}, "health": {}}), encoding="utf-8")
+ digest_md.write_text("# digest\n", encoding="utf-8")
+
+ monkeypatch.delenv("GITHUB_TOKEN", raising=False)
+ monkeypatch.delenv("GH_TOKEN", raising=False)
+ monkeypatch.setattr(
+ module,
+ "parse_args",
+ lambda: argparse.Namespace(
+ repo="Prekzursil/Reframe",
+ digest_json=digest_json_rel,
+ digest_md=digest_md_rel,
+ out_json=out_json_rel,
+ title="Weekly Ops Digest (rolling)",
+ ),
+ )
+ with pytest.raises(SystemExit):
+ module.main()
+
+ monkeypatch.setenv("GITHUB_TOKEN", "token")
+ monkeypatch.setattr(
+ module,
+ "parse_args",
+ lambda: argparse.Namespace(
+ repo="invalid-repo",
+ digest_json=digest_json_rel,
+ digest_md=digest_md_rel,
+ out_json=out_json_rel,
+ title="Weekly Ops Digest (rolling)",
+ ),
+ )
+ with pytest.raises(SystemExit):
+ module.main()
+
+
+def test_release_readiness_run_json_and_collect_status(monkeypatch, tmp_path):
+ module = _load_script("release_readiness_report")
+
+ monkeypatch.setattr(module.subprocess, "run", lambda *args, **kwargs: (_ for _ in ()).throw(FileNotFoundError("missing")))
+ _expect(module._run_json(["gh"], cwd=tmp_path) is None, "Expected None when subprocess binary is missing")
+
+ monkeypatch.setattr(module, "_main_sha", lambda _repo: "abc")
+ monkeypatch.setattr(module, "_run_json", lambda _cmd, cwd: {"unexpected": True})
+ status = module._collect_gh_status(tmp_path)
+ _expect(status["ci"] is None and status["codeql"] is None, "Expected null workflow snapshots for malformed runs payload")
+ _expect(isinstance(status["branch_protection"], dict), "Expected branch protection payload dictionary")
+ _expect(status["branch_protection"].get("required_reviews") is None, "Expected missing required_reviews for malformed payload")
+
+
+
+
+
diff --git a/apps/api/tests/test_scripts_quality_gates.py b/apps/api/tests/test_scripts_quality_gates.py
index c6780a87..5b8085bc 100644
--- a/apps/api/tests/test_scripts_quality_gates.py
+++ b/apps/api/tests/test_scripts_quality_gates.py
@@ -48,7 +48,7 @@ def test_assert_coverage_100_parses_xml_and_lcov(tmp_path):
_expect(xml_stats.percent == 100.0, "Expected XML coverage percent to be 100")
_expect(lcov_stats.percent == 100.0, "Expected LCOV coverage percent to be 100")
- status, findings = module.evaluate([xml_stats, lcov_stats])
+ status, findings, _metrics = module.evaluate([xml_stats, lcov_stats], expected_inventory=None)
_expect(status == "pass", "Expected pass when all components are at 100%")
_expect(findings == [], "Expected no findings for full coverage")
@@ -60,7 +60,7 @@ def test_assert_coverage_100_detects_below_target(tmp_path):
lcov_path.write_text("TN:\nSF:file.ts\nLF:4\nLH:3\nend_of_record\n", encoding="utf-8")
stats = module.parse_lcov("web", lcov_path)
- status, findings = module.evaluate([stats])
+ status, findings, _metrics = module.evaluate([stats], expected_inventory=None)
_expect(status == "fail", "Expected fail when a component is below 100%")
_expect(any("below 100%" in item for item in findings), "Expected below-100 finding")
@@ -137,3 +137,49 @@ def test_sonar_evaluate_status_still_enforces_quality_gate():
)
_expect(any("quality gate" in item for item in findings), "Expected quality gate finding")
+
+
+def test_assert_coverage_inventory_skips_empty_files(tmp_path, monkeypatch):
+ module = _load_module("assert_coverage_100")
+
+ empty_init = tmp_path / "apps" / "api" / "app" / "__init__.py"
+ empty_init.parent.mkdir(parents=True, exist_ok=True)
+ empty_init.write_text("", encoding="utf-8")
+
+ main_py = empty_init.parent / "main.py"
+ main_py.write_text("print('ok')\n", encoding="utf-8")
+
+ monkeypatch.setattr(
+ module,
+ "_load_git_tracked_files",
+ lambda _root: ["apps/api/app/__init__.py", "apps/api/app/main.py"],
+ )
+
+ expected = module._collect_expected_inventory(tmp_path)
+
+ _expect("apps/api/app/main.py" in expected, "Expected non-empty tracked source file in inventory")
+ _expect("apps/api/app/__init__.py" not in expected, "Expected empty tracked file to be skipped")
+
+def test_assert_coverage_inventory_skips_python_metadata_only_file(tmp_path, monkeypatch):
+ module = _load_module("assert_coverage_100")
+
+ metadata_init = tmp_path / "packages" / "media-core" / "src" / "media_core" / "__init__.py"
+ metadata_init.parent.mkdir(parents=True, exist_ok=True)
+ metadata_init.write_text('"""pkg"""\n\n__all__ = []\n', encoding="utf-8")
+
+ logic_file = metadata_init.parent / "core.py"
+ logic_file.write_text("VALUE = 1\n", encoding="utf-8")
+
+ monkeypatch.setattr(
+ module,
+ "_load_git_tracked_files",
+ lambda _root: [
+ "packages/media-core/src/media_core/__init__.py",
+ "packages/media-core/src/media_core/core.py",
+ ],
+ )
+
+ expected = module._collect_expected_inventory(tmp_path)
+
+ _expect("packages/media-core/src/media_core/__init__.py" not in expected, "Expected metadata-only module file to be skipped")
+ _expect("packages/media-core/src/media_core/core.py" in expected, "Expected executable module file in inventory")
diff --git a/apps/api/tests/test_scripts_quality_gates_extended.py b/apps/api/tests/test_scripts_quality_gates_extended.py
new file mode 100644
index 00000000..4edb3995
--- /dev/null
+++ b/apps/api/tests/test_scripts_quality_gates_extended.py
@@ -0,0 +1,285 @@
+from __future__ import annotations
+
+import argparse
+import sys
+from importlib.util import module_from_spec, spec_from_file_location
+from pathlib import Path
+
+
+def _expect(condition: bool, message: str) -> None:
+ if not condition:
+ raise AssertionError(message)
+
+
+def _load_quality(name: str):
+ repo_root = Path(__file__).resolve().parents[3]
+ script_dir = repo_root / "scripts" / "quality"
+ if str(script_dir) not in sys.path:
+ sys.path.insert(0, str(script_dir))
+ module_path = script_dir / f"{name}.py"
+ spec = spec_from_file_location(name, module_path)
+ _expect(spec is not None and spec.loader is not None, f"Unable to load module spec for {name}")
+ module = module_from_spec(spec)
+ sys.modules[spec.name] = module
+ spec.loader.exec_module(module)
+ return module
+
+
+def test_check_codacy_zero_main_paths(monkeypatch):
+ module = _load_quality("check_codacy_zero")
+
+ args = argparse.Namespace(repo="owner/repo", pull_request="", out_json="out/codacy.json", out_md="out/codacy.md")
+ monkeypatch.setattr(module, "_parse_args", lambda: args)
+ monkeypatch.delenv("CODACY_API_TOKEN", raising=False)
+ monkeypatch.setenv("GITHUB_REPOSITORY", "owner/repo")
+
+ _expect(module.main() == 1, "Expected missing CODACY_API_TOKEN to fail")
+
+ monkeypatch.setenv("CODACY_API_TOKEN", "token")
+ bad_args = argparse.Namespace(repo="bad slug", pull_request="", out_json="out/codacy.json", out_md="out/codacy.md")
+ monkeypatch.setattr(module, "_parse_args", lambda: bad_args)
+ _expect(module.main() == 1, "Expected invalid repo slug to fail")
+
+ calls = {"count": 0}
+
+ def fake_request(url: str, token: str, *, method: str = "GET", data=None):
+ _ = (url, token, method, data)
+ calls["count"] += 1
+ if calls["count"] == 1:
+ return {"analyzed": False, "pagination": {"total": 0}}
+ return {"analyzed": True, "pagination": {"total": 0}}
+
+ pr_args = argparse.Namespace(repo="owner/repo", pull_request="107", out_json="out/codacy.json", out_md="out/codacy.md")
+ monkeypatch.setattr(module, "_parse_args", lambda: pr_args)
+ monkeypatch.setattr(module, "_request_json", fake_request)
+ monkeypatch.setattr(module.time, "sleep", lambda _n: None)
+
+ _expect(module.main() == 0, "Expected PR scope to pass when open issues are zero")
+
+
+def test_check_deepscan_zero_main_paths(monkeypatch):
+ module = _load_quality("check_deepscan_zero")
+
+ args = argparse.Namespace(out_json="out/deepscan.json", out_md="out/deepscan.md")
+ monkeypatch.setattr(module, "_parse_args", lambda: args)
+ monkeypatch.delenv("GITHUB_TOKEN", raising=False)
+ monkeypatch.delenv("GH_TOKEN", raising=False)
+ monkeypatch.delenv("GITHUB_REPOSITORY", raising=False)
+ monkeypatch.delenv("GITHUB_SHA", raising=False)
+ _expect(module.main() == 1, "Expected missing GitHub context to fail")
+
+ monkeypatch.setenv("GITHUB_TOKEN", "token")
+ monkeypatch.setenv("GITHUB_REPOSITORY", "Prekzursil/Reframe")
+ monkeypatch.setenv("GITHUB_SHA", "abc123")
+
+ payload = {
+ "check_runs": [
+ {
+ "name": "DeepScan",
+ "conclusion": "success",
+ "details_url": "https://deepscan.io/analysis",
+ "output": {"summary": "0 new and 2 fixed issues"},
+ "completed_at": "2026-03-04T00:00:00Z",
+ }
+ ]
+ }
+ monkeypatch.setattr(module, "_request_json", lambda _url, _token: payload)
+
+ _expect(module.main() == 0, "Expected DeepScan zero-main path to pass")
+
+
+
+
+def test_check_deepscan_zero_status_context_fallback(monkeypatch):
+ module = _load_quality("check_deepscan_zero")
+
+ args = argparse.Namespace(out_json="out/deepscan-status.json", out_md="out/deepscan-status.md")
+ monkeypatch.setattr(module, "_parse_args", lambda: args)
+ monkeypatch.setenv("GITHUB_TOKEN", "token")
+ monkeypatch.setenv("GITHUB_REPOSITORY", "Prekzursil/Reframe")
+ monkeypatch.setenv("GITHUB_SHA", "abc123")
+
+ def fake_request(url: str, _token: str):
+ if "check-runs" in url:
+ return {"check_runs": []}
+ return {
+ "statuses": [
+ {
+ "context": "DeepScan",
+ "state": "success",
+ "description": "0 new and 1 fixed issues",
+ "target_url": "https://deepscan.io/dashboard",
+ "updated_at": "2026-03-04T01:00:00Z",
+ }
+ ]
+ }
+
+ monkeypatch.setattr(module, "_request_json", fake_request)
+
+ _expect(module.main() == 0, "Expected status-context fallback to pass when new issues are zero")
+
+
+def test_check_sentry_zero_main_paths(monkeypatch):
+ module = _load_quality("check_sentry_zero")
+ args = argparse.Namespace(out_json="out/sentry.json", out_md="out/sentry.md")
+ monkeypatch.setattr(module, "_parse_args", lambda: args)
+
+ monkeypatch.delenv("SENTRY_AUTH_TOKEN", raising=False)
+ monkeypatch.setenv("SENTRY_ORG", "andrei-visalon")
+ monkeypatch.setenv("SENTRY_PROJECT_BACKEND", "reframe-backend")
+ monkeypatch.setenv("SENTRY_PROJECT_WEB", "reframe-web")
+ _expect(module.main() == 1, "Expected missing token to fail")
+
+ monkeypatch.setenv("SENTRY_AUTH_TOKEN", "token")
+
+ def fake_request(_url: str, _token: str):
+ return [], {"x-hits": "0"}
+
+ monkeypatch.setattr(module, "_request", fake_request)
+ _expect(module.main() == 0, "Expected sentry zero check to pass when unresolved=0")
+
+
+def test_check_sonar_zero_main_paths(monkeypatch):
+ module = _load_quality("check_sonar_zero")
+
+ args = argparse.Namespace(
+ project_key="Prekzursil_Reframe",
+ token="",
+ branch="",
+ pull_request="107",
+ wait_seconds=0,
+ require_quality_gate=True,
+ ignore_open_issues=False,
+ out_json="out/sonar.json",
+ out_md="out/sonar.md",
+ )
+ monkeypatch.setattr(module, "_parse_args", lambda: args)
+ monkeypatch.delenv("SONAR_TOKEN", raising=False)
+ _expect(module.main() == 1, "Expected missing SONAR_TOKEN to fail")
+
+ monkeypatch.setenv("SONAR_TOKEN", "token")
+ monkeypatch.setattr(module, "_query_sonar_status", lambda **_kwargs: (0, "OK"))
+ _expect(module.main() == 0, "Expected sonar zero to pass with open issues 0 and gate OK")
+
+
+def test_check_required_checks_main_paths(monkeypatch):
+ module = _load_quality("check_required_checks")
+ args = argparse.Namespace(
+ repo="Prekzursil/Reframe",
+ sha="1234",
+ required_context=["CI", "Coverage 100 Gate"],
+ timeout_seconds=1,
+ poll_seconds=1,
+ out_json="out/required.json",
+ out_md="out/required.md",
+ )
+ monkeypatch.setattr(module, "_parse_args", lambda: args)
+
+ monkeypatch.setenv("GITHUB_TOKEN", "token")
+
+ def fake_api_get(repo: str, path: str, token: str):
+ _ = (repo, token)
+ if "check-runs" in path:
+ return {
+ "check_runs": [
+ {"name": "CI", "status": "completed", "conclusion": "success"},
+ {"name": "Coverage 100 Gate", "status": "completed", "conclusion": "success"},
+ ]
+ }
+ return {"statuses": []}
+
+ monkeypatch.setattr(module, "_api_get", fake_api_get)
+
+ _expect(module.main() == 0, "Expected required-checks gate to pass with all contexts successful")
+
+
+def test_check_visual_zero_percy_and_applitools(monkeypatch, tmp_path):
+ module = _load_quality("check_visual_zero")
+
+ percy_args = argparse.Namespace(
+ provider="percy",
+ sha="abc1234",
+ branch="feat",
+ percy_token="token",
+ applitools_results="",
+ out_json="tmp/percy.json",
+ out_md="tmp/percy.md",
+ )
+ monkeypatch.setattr(module, "_parse_args", lambda: percy_args)
+
+ payload = {
+ "data": [
+ {
+ "id": "build-1",
+ "attributes": {
+ "created-at": "2026-03-04T00:00:00Z",
+ "review-state": "approved",
+ "total-comparisons-diff": 0,
+ },
+ }
+ ]
+ }
+ monkeypatch.setattr(module, "_percy_request", lambda *_args, **_kwargs: payload)
+ monkeypatch.setattr(module.time, "sleep", lambda _n: None)
+
+ _expect(module.main() == 0, "Expected Percy visual check to pass")
+
+ applitools_json = Path("tmp/applitools-input.json")
+ applitools_json.parent.mkdir(parents=True, exist_ok=True)
+ applitools_json.write_text('{"unresolved":0,"mismatches":0,"missing":0}', encoding="utf-8")
+
+ applitools_args = argparse.Namespace(
+ provider="applitools",
+ sha="",
+ branch="",
+ percy_token="",
+ applitools_results=str(applitools_json),
+ out_json="tmp/applitools-out.json",
+ out_md="tmp/applitools-out.md",
+ )
+ monkeypatch.setattr(module, "_parse_args", lambda: applitools_args)
+ _expect(module.main() == 0, "Expected Applitools visual check to pass")
+
+
+def test_percy_auto_approve_paths(monkeypatch):
+ module = _load_quality("percy_auto_approve")
+
+ monkeypatch.delenv("PERCY_TOKEN", raising=False)
+ _expect(module.main(["--sha", "abcdef1"]) == 1, "Expected missing token path to fail")
+
+ monkeypatch.setenv("PERCY_TOKEN", "token")
+ _expect(module.main(["--sha", "bad-sha"]) == 1, "Expected invalid SHA to fail")
+
+ monkeypatch.setattr(module, "_query_builds", lambda **_kwargs: {"data": []})
+ monkeypatch.setattr(module.time, "sleep", lambda _n: None)
+ _expect(
+ module.main(["--sha", "abcdef1", "--retry-attempts", "1", "--retry-delay-seconds", "1"]) == 0,
+ "Expected no-unreviewed-build path to be informational pass",
+ )
+
+ posted = {"called": False}
+
+ def fake_query(**_kwargs):
+ return {
+ "data": [
+ {
+ "id": "b1",
+ "attributes": {"state": "finished", "review-state": "unreviewed", "created-at": "2026-03-04"},
+ }
+ ]
+ }
+
+ def fake_request_json(*, token, method, path, query=None, payload=None, basic_auth=None):
+ _ = (token, query, basic_auth)
+ if method == "POST":
+ posted["called"] = True
+ _expect(path == "/reviews", "Expected reviews endpoint for approval")
+ _expect(payload is not None, "Expected review payload")
+ return {"data": []}
+
+ monkeypatch.setattr(module, "_query_builds", fake_query)
+ monkeypatch.setattr(module, "_request_json", fake_request_json)
+
+ rc = module.main(["--sha", "abcdef1", "--retry-attempts", "1", "--retry-delay-seconds", "1"])
+ _expect(rc == 0, "Expected successful Percy auto-approval")
+ _expect(posted["called"], "Expected approval POST to be executed")
diff --git a/apps/desktop/index.html b/apps/desktop/index.html
index 052a6280..5dc50173 100644
--- a/apps/desktop/index.html
+++ b/apps/desktop/index.html
@@ -10,34 +10,56 @@
-