diff --git a/extralit-server/pdm.lock b/extralit-server/pdm.lock index 98d795cef..9dd229c3d 100644 --- a/extralit-server/pdm.lock +++ b/extralit-server/pdm.lock @@ -5,7 +5,7 @@ groups = ["default", "marker", "postgresql", "test"] strategy = [] lock_version = "4.5.0" -content_hash = "sha256:5c7abbe8ca580bf2529d321defa5d14ce03cec41bfa7ac8c60db5099a9d72404" +content_hash = "sha256:c3a2035f7b8cc1ef9c8d04cc9ded68195f03418c57b861d1b5d9c60b08585a57" [[metadata.targets]] requires_python = ">=3.10" @@ -300,15 +300,6 @@ files = [ {file = "attrs-24.2.0.tar.gz", hash = "sha256:5cfb1b9148b5b086569baec03f20d7b6bf3bcacc9a42bebf87ffaaca362f6346"}, ] -[[package]] -name = "backoff" -version = "2.2.1" -summary = "" -files = [ - {file = "backoff-2.2.1-py3-none-any.whl", hash = "sha256:63579f9a0628e06278f7e47b7d7d5b6ce20dc65c5e96a6f3ca99a6adca0396e8"}, - {file = "backoff-2.2.1.tar.gz", hash = "sha256:03f829f5bb1923180821643f8753b0502c3b682293992485b0eef2807afa5cba"}, -] - [[package]] name = "backports-asyncio-runner" version = "1.2.0" diff --git a/extralit-server/pyproject.toml b/extralit-server/pyproject.toml index c09229f65..767c557d1 100644 --- a/extralit-server/pyproject.toml +++ b/extralit-server/pyproject.toml @@ -28,7 +28,7 @@ dependencies = [ "opensearch-py ~= 2.0.0", "elasticsearch8[async] ~= 8.7.0", "brotli-asgi ~= 1.4.0", - "backoff ~= 2.2.0", + "tenacity>=9.1.2", # Database dependencies "alembic ~= 1.13.0", "SQLAlchemy ~= 2.0.0", @@ -194,12 +194,13 @@ ignore = [ "B904", # exception chaining (will phase in later) "UP007", # | union syntax modernization "UP045", # Optional[...] -> X | None modernization + "UP038", "B027", # empty abstract method not marked abstract "B024", # abstract base class without abstract methods "B017", # broad exception asserts in tests "RUF012", # mutable class var typing inference "B008", # FastAPI Depends pattern - "FAST002" # FastAPI Depends suggestion + "FAST002", # FastAPI Depends suggestion ] [tool.ruff.lint.per-file-ignores] diff --git a/extralit-server/src/extralit_server/_app.py b/extralit-server/src/extralit_server/_app.py index 2c236fccf..d14db4d08 100644 --- a/extralit-server/src/extralit_server/_app.py +++ b/extralit-server/src/extralit_server/_app.py @@ -25,7 +25,6 @@ from typing import Annotated from urllib.parse import urlencode -import backoff import redis from brotli_asgi import BrotliMiddleware from fastapi import FastAPI, Query, Request @@ -34,6 +33,7 @@ from starlette.datastructures import URL from starlette.middleware.cors import CORSMiddleware from starlette.responses import HTMLResponse, RedirectResponse +from tenacity import retry, stop_after_delay, wait_exponential from extralit_server import helpers from extralit_server._version import __version__ as extralit_version @@ -361,7 +361,7 @@ async def configure_search_engine(): logging.getLogger("opensearch").setLevel(logging.ERROR) logging.getLogger("opensearch_transport").setLevel(logging.ERROR) - @backoff.on_exception(backoff.expo, ConnectionError, max_time=60) + @retry(stop=stop_after_delay(60), wait=wait_exponential(multiplier=1, min=1, max=60)) async def ping_search_engine(): async for search_engine in get_search_engine(): if not await search_engine.ping(): @@ -376,7 +376,7 @@ async def ping_search_engine(): def configure_redis(): - @backoff.on_exception(backoff.expo, ConnectionError, max_time=60) + @retry(stop=stop_after_delay(60), wait=wait_exponential(multiplier=1, min=1, max=60)) def ping_redis(): try: REDIS_CONNECTION.ping() diff --git a/extralit-server/src/extralit_server/api/handlers/v1/workspaces.py b/extralit-server/src/extralit_server/api/handlers/v1/workspaces.py index f2adbdf0f..a350f0192 100644 --- a/extralit-server/src/extralit_server/api/handlers/v1/workspaces.py +++ b/extralit-server/src/extralit_server/api/handlers/v1/workspaces.py @@ -359,6 +359,151 @@ async def workspace_doctor( ) ) + # Check 5: Database connections health with autofix + try: + import asyncio + + from sqlalchemy import text + from sqlalchemy.exc import SQLAlchemyError + + # Import the async engine to access database URL and dispose method + from extralit_server.database import async_engine + + # Get database URL and detect type + db_url = str(async_engine.url) + is_postgresql = "postgresql" in db_url.lower() + is_sqlite = "sqlite" in db_url.lower() + + async def check_db_health(): + if is_postgresql: + # A. Liveness Check (The most important part) + # Simple query to prove the TCP pipe is open. + await db.execute(text("SELECT 1")) + + # B. Deep Inspection (Optional - only if Liveness passes) + active_connections_query = text(""" + SELECT + count(*) as active_connections, + count(*) filter (where state = 'idle in transaction' and now() - state_change > interval '1 minute') as stale_transaction_connections + FROM pg_stat_activity + WHERE datname = current_database() + """) + + result = await db.execute(active_connections_query) + conn_stats = result.first() + + if conn_stats: + stale_txn = conn_stats.stale_transaction_connections + active = conn_stats.active_connections + + if stale_txn > 0: + # Idle in transaction is BAD. It blocks table locks. + checks.append( + WorkspaceDoctorCheckResult( + check_name="db_transaction_health", + status="warning", + message=f"Found {stale_txn} connections idle in transaction.", + ) + ) + else: + checks.append( + WorkspaceDoctorCheckResult( + check_name="db_connection_health", + status="ok", + message=f"Pool healthy. Active DB sessions: {active}", + ) + ) + + elif is_sqlite: + # SQLite doesn't have connection pooling like PostgreSQL + # Just check if we can execute a simple query + simple_query = text("SELECT 1 as test") + result = await db.execute(simple_query) + test_result = result.scalar() + + if test_result == 1: + checks.append( + WorkspaceDoctorCheckResult( + check_name="database_connections", + status="ok", + message="SQLite database connection is healthy (SQLite uses file-based connections, no pooling)", + fixed=False, + ) + ) + else: + checks.append( + WorkspaceDoctorCheckResult( + check_name="database_connections", + status="error", + message="SQLite database connection test failed", + fixed=False, + ) + ) + else: + # Other database types - skip detailed connection monitoring + checks.append( + WorkspaceDoctorCheckResult( + check_name="database_connections", + status="ok", + message=f"Database connection check skipped for {db_url.split('://')[0]} (unsupported for detailed monitoring)", + fixed=False, + ) + ) + + # SAFETY TIMEOUT: If DB doesn't answer in 3s, the connection is dead. + # This prevents the doctor check from hanging for 15 mins. + await asyncio.wait_for(check_db_health(), timeout=3.0) + + except (asyncio.TimeoutError, SQLAlchemyError, OSError) as e: + # Handle database connectivity issues + if autofix: + # THE AUTOFIX LOGIC + # If we timed out or got a connection error, the pool is likely stale. + + error_msg = f"Database unresponsive (Timeout/Error). Resetting connection pool. Error: {e!s}" + + # DISPOSE THE POOL + # This closes all internal sockets. The next request will force a fresh handshake. + try: + await async_engine.dispose() + checks.append( + WorkspaceDoctorCheckResult( + check_name="database_connections", + status="warning", # Warning because we had to reset + message=error_msg, + fixed=True, # We successfully reset the pool + ) + ) + except Exception as dispose_error: + error_msg += f" (Failed to dispose pool: {dispose_error!s})" + checks.append( + WorkspaceDoctorCheckResult( + check_name="database_connections", + status="error", + message=error_msg, + fixed=False, + ) + ) + else: + # Just report the issue without fixing + checks.append( + WorkspaceDoctorCheckResult( + check_name="database_connections", + status="error", + message=f"Database unresponsive (Timeout/Error). Run with --autofix to automatically reset connection pool. Error: {e!s}", + fixed=False, + ) + ) + except Exception as e: + checks.append( + WorkspaceDoctorCheckResult( + check_name="database_connections", + status="warning", + message=f"Could not check database connections: {e!s}", + fixed=False, + ) + ) + # Determine overall status has_errors = any(check.status == "error" for check in checks) has_fixed = any(check.fixed for check in checks) diff --git a/extralit-server/src/extralit_server/cli/search_engine/reindex.py b/extralit-server/src/extralit_server/cli/search_engine/reindex.py index 1b62d1a2e..3ac492234 100644 --- a/extralit-server/src/extralit_server/cli/search_engine/reindex.py +++ b/extralit-server/src/extralit_server/cli/search_engine/reindex.py @@ -24,7 +24,7 @@ from sqlalchemy.orm import selectinload from extralit_server.cli.rich import echo_in_panel -from extralit_server.database import AsyncSessionLocal, retry_db_operation +from extralit_server.database import AsyncSessionLocal, db_retry_policy from extralit_server.models import Dataset, Record, Response, Suggestion from extralit_server.search_engine import SearchEngine, get_search_engine @@ -33,7 +33,7 @@ class Reindexer: YIELD_PER = 100 @classmethod - @retry_db_operation(max_retries=3, delay=1.0, backoff=2.0) + @db_retry_policy async def reindex_dataset(cls, db: AsyncSession, search_engine: SearchEngine, dataset_id: UUID) -> Dataset: dataset = ( await db.execute( @@ -55,7 +55,7 @@ async def reindex_dataset(cls, db: AsyncSession, search_engine: SearchEngine, da @classmethod async def reindex_datasets(cls, db: AsyncSession, search_engine: SearchEngine) -> AsyncGenerator[Dataset, None]: - @retry_db_operation(max_retries=3, delay=1.0, backoff=2.0) + @db_retry_policy async def _get_datasets_batch(offset: int = 0, limit: int = cls.YIELD_PER): """Get a batch of datasets with retry logic""" return await db.execute( @@ -109,7 +109,7 @@ async def _get_datasets_batch(offset: int = 0, limit: int = cls.YIELD_PER): async def reindex_dataset_records( cls, db: AsyncSession, search_engine: SearchEngine, dataset: Dataset ) -> AsyncGenerator[list[Record], None]: - @retry_db_operation(max_retries=3, delay=1.0, backoff=2.0) + @db_retry_policy async def _get_records_batch(offset: int = 0, limit: int = cls.YIELD_PER): """Get a batch of records with retry logic""" return await db.execute( @@ -162,12 +162,12 @@ async def _get_records_batch(offset: int = 0, limit: int = cls.YIELD_PER): continue @classmethod - @retry_db_operation(max_retries=3, delay=0.5, backoff=2.0) + @db_retry_policy async def count_datasets(cls, db: AsyncSession) -> int: return (await db.execute(select(func.count(Dataset.id)))).scalar_one() @classmethod - @retry_db_operation(max_retries=3, delay=0.5, backoff=2.0) + @db_retry_policy async def count_dataset_records(cls, db: AsyncSession, dataset: Dataset) -> int: return (await db.execute(select(func.count(Record.id)).filter_by(dataset_id=dataset.id))).scalar_one() diff --git a/extralit-server/src/extralit_server/contexts/accounts.py b/extralit-server/src/extralit_server/contexts/accounts.py index 52cd7d5b4..7ded479f8 100644 --- a/extralit-server/src/extralit_server/contexts/accounts.py +++ b/extralit-server/src/extralit_server/contexts/accounts.py @@ -21,7 +21,7 @@ from sqlalchemy.orm import selectinload from extralit_server.contexts import datasets -from extralit_server.database import retry_db_operation +from extralit_server.database import db_retry_policy from extralit_server.enums import UserRole from extralit_server.errors.future import NotUniqueError, UnprocessableEntityError from extralit_server.models import User, Workspace, WorkspaceUser @@ -86,34 +86,23 @@ async def delete_workspace(db: AsyncSession, workspace: Workspace): return await workspace.delete(db) +@db_retry_policy async def user_exists(db: AsyncSession, user_id: UUID) -> bool: - @retry_db_operation(max_retries=3, delay=0.1, backoff=2.0) - async def _execute_query(): - return await db.scalar(select(exists().where(User.id == user_id))) - - return await _execute_query() + return await db.scalar(select(exists().where(User.id == user_id))) +@db_retry_policy async def get_user_by_username(db: AsyncSession, username: str) -> User | None: - @retry_db_operation(max_retries=3, delay=0.1, backoff=2.0) - async def _execute_query(): - result = await db.execute( - select(User) - .filter(func.lower(User.username) == func.lower(username)) - .options(selectinload(User.workspaces)) - ) - return result.scalar_one_or_none() - - return await _execute_query() + result = await db.execute( + select(User).filter(func.lower(User.username) == func.lower(username)).options(selectinload(User.workspaces)) + ) + return result.scalar_one_or_none() +@db_retry_policy async def get_user_by_api_key(db: AsyncSession, api_key: str) -> User | None: - @retry_db_operation(max_retries=3, delay=0.1, backoff=2.0) - async def _execute_query(): - result = await db.execute(select(User).where(User.api_key == api_key).options(selectinload(User.workspaces))) - return result.scalar_one_or_none() - - return await _execute_query() + result = await db.execute(select(User).where(User.api_key == api_key).options(selectinload(User.workspaces))) + return result.scalar_one_or_none() async def list_users(db: "AsyncSession") -> Sequence[User]: diff --git a/extralit-server/src/extralit_server/contexts/distribution.py b/extralit-server/src/extralit_server/contexts/distribution.py index 71c0802e5..733134bfb 100644 --- a/extralit-server/src/extralit_server/contexts/distribution.py +++ b/extralit-server/src/extralit_server/contexts/distribution.py @@ -14,11 +14,10 @@ from uuid import UUID -import backoff -import sqlalchemy from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload +from tenacity import retry, stop_after_delay, wait_exponential from extralit_server.database import _get_async_db from extralit_server.enums import DatasetDistributionStrategy, RecordStatus @@ -41,7 +40,7 @@ async def unsafe_update_records_status(db: AsyncSession, records: list[Record]): await _update_record_status(db, record) -@backoff.on_exception(backoff.expo, sqlalchemy.exc.SQLAlchemyError, max_time=MAX_TIME_RETRY_SQLALCHEMY_ERROR) +@retry(stop=stop_after_delay(MAX_TIME_RETRY_SQLALCHEMY_ERROR), wait=wait_exponential(multiplier=1, min=1, max=15)) async def update_record_status(search_engine: SearchEngine, record_id: UUID) -> Record: async for db in _get_async_db(isolation_level="SERIALIZABLE"): record = await Record.get_or_raise( diff --git a/extralit-server/src/extralit_server/database.py b/extralit-server/src/extralit_server/database.py index 584f9105a..4463082e9 100644 --- a/extralit-server/src/extralit_server/database.py +++ b/extralit-server/src/extralit_server/database.py @@ -12,12 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -import asyncio import logging import os from collections import OrderedDict -from collections.abc import AsyncGenerator, Callable, Generator -from functools import wraps +from collections.abc import AsyncGenerator, Generator from typing import TypeVar from sqlalchemy import create_engine, event, make_url @@ -26,14 +24,49 @@ from sqlalchemy.exc import DBAPIError, DisconnectionError, OperationalError, TimeoutError from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from sqlalchemy.orm import Session, scoped_session, sessionmaker +from tenacity import before_sleep_log, retry, stop_after_attempt, wait_exponential import extralit_server from extralit_server.settings import settings +try: + import asyncpg + + ASYNCPG_AVAILABLE = True +except ImportError: + ASYNCPG_AVAILABLE = False + asyncpg = None + logger = logging.getLogger(__name__) T = TypeVar("T") + +def _log_connection_pool_status(): + """Log current database connection pool status for debugging.""" + try: + if settings.database_is_postgresql: + # Log async engine pool status + pool = async_engine.pool + # SQLAlchemy async pool does not expose sync Pool API (size, checkedin, etc.) + logger.info(f"Async connection pool status: class={pool.__class__.__name__}, repr={pool}") + + # Log sync engine pool status + sync_pool = sync_engine.pool + logger.info( + f"Sync connection pool status: " + f"pool_size={sync_pool.size}, " + f"checkedin={sync_pool.checkedin()}, " + f"checkedout={sync_pool.checkedout()}, " + f"overflow={sync_pool.overflow()}, " + f"invalid={sync_pool.invalid}" + ) + else: + logger.info("Using SQLite database (no connection pooling)") + except Exception as e: + logger.warning(f"Failed to log connection pool status: {e}") + + ALEMBIC_CONFIG_FILE = os.path.normpath(os.path.join(os.path.dirname(extralit_server.__file__), "alembic.ini")) TAGGED_REVISIONS = OrderedDict( { @@ -105,79 +138,67 @@ async def _get_async_db(isolation_level: IsolationLevel | None = None) -> AsyncG await db.close() -def retry_db_operation(max_retries: int = 3, delay: float = 0.1, backoff: float = 2.0): - """ - Decorator to retry database operations on connection failures. - - Args: - max_retries: Maximum number of retry attempts - delay: Initial delay between retries in seconds - backoff: Multiplier for delay after each retry - """ - - def decorator(func: Callable[..., T]) -> Callable[..., T]: - @wraps(func) - async def async_wrapper(*args, **kwargs) -> T: - last_exception = None - current_delay = delay - - for attempt in range(max_retries + 1): - try: - return await func(*args, **kwargs) - except ( - DBAPIError, - DisconnectionError, - OperationalError, - TimeoutError, - ConnectionRefusedError, - OSError, - ) as e: - last_exception = e - - # Check if this is a connection-related error - error_msg = str(e).lower() - if any( - msg in error_msg - for msg in [ - "connection was closed", - "connection does not exist", - "connection timed out", - "connection refused", - "connection reset", - "broken pipe", - "network is unreachable", - "no route to host", - "connection aborted", - "queuepool limit", # Pool exhaustion - "timeout 30.00", # Pool timeout - "errno 111", # Connection refused errno - "errno 110", # Connection timed out errno - ] - ): - if attempt < max_retries: - logger.warning( - f"Database connection error on attempt {attempt + 1}/{max_retries + 1}: {e}. " - f"Retrying in {current_delay:.2f}s..." - ) - await asyncio.sleep(current_delay) - current_delay *= backoff - continue - - # Re-raise non-connection errors immediately - raise - except Exception: - # Re-raise non-database errors immediately - raise - - # If all retries failed, raise the last exception - if last_exception: - logger.error(f"Database operation failed after {max_retries + 1} attempts: {last_exception}") - raise last_exception - - @wraps(func) - def sync_wrapper(*args, **kwargs) -> T: - return func(*args, **kwargs) - - return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper - - return decorator +def is_db_connection_error(exception): + """Custom filter to only retry on actual connection loss, not bad SQL syntax.""" + error_msg = str(exception).lower() + + # Check for standard SQLAlchemy connection errors + if isinstance( + exception, OperationalError | DBAPIError | DisconnectionError | TimeoutError | ConnectionRefusedError | OSError + ): + return True + + # Check for asyncpg-specific connection errors + if ASYNCPG_AVAILABLE and asyncpg is not None: + try: + if isinstance(exception, asyncpg.exceptions.InternalServerError): + if ( + "MaxClientsInSessionMode" in error_msg + or "max clients reached" in error_msg + or "pool_size" in error_msg + ): + return True + except AttributeError: + pass + + # Check for other connection-related error messages + if any( + msg in error_msg + for msg in [ + "connection was closed", + "connection does not exist", + "connection timed out", + "connection refused", + "connection reset", + "broken pipe", + "network is unreachable", + "no route to host", + "connection aborted", + "queuepool limit", # Pool exhaustion + "timeout 30.00", # Pool timeout + "errno 111", # Connection refused errno + "errno 110", # Connection timed out errno + ] + ): + return True + + return False + + +def before_retry_log_and_pool_status(retry_state): + """Log both retry attempt and connection pool status.""" + before_sleep_log(logger, logging.WARNING)(retry_state) + _log_connection_pool_status() + + +# Reusable decorator +db_retry_policy = retry( + # Retry only on connection errors using custom filter + retry=is_db_connection_error, + # Wait 0.1s, then 0.2s, then 0.4s... up to 2 seconds + wait=wait_exponential(multiplier=0.1, min=0.1, max=2), + # Stop after 3 attempts + stop=stop_after_attempt(3), + # Log before retrying and check pool status + before_sleep=before_retry_log_and_pool_status, +)