Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions extralit-server/pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions extralit-server/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ dependencies = [
"opensearch-py ~= 2.0.0",
"elasticsearch8[async] ~= 8.7.0",
"brotli-asgi ~= 1.4.0",
"backoff ~= 2.2.0",
"tenacity>=9.1.2",
# Database dependencies
"alembic ~= 1.13.0",
"SQLAlchemy ~= 2.0.0",
Expand Down Expand Up @@ -194,12 +194,13 @@ ignore = [
"B904", # exception chaining (will phase in later)
"UP007", # | union syntax modernization
"UP045", # Optional[...] -> X | None modernization
"UP038",
"B027", # empty abstract method not marked abstract
"B024", # abstract base class without abstract methods
"B017", # broad exception asserts in tests
"RUF012", # mutable class var typing inference
"B008", # FastAPI Depends pattern
"FAST002" # FastAPI Depends suggestion
"FAST002", # FastAPI Depends suggestion
]

[tool.ruff.lint.per-file-ignores]
Expand Down
6 changes: 3 additions & 3 deletions extralit-server/src/extralit_server/_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from typing import Annotated
from urllib.parse import urlencode

import backoff
import redis
from brotli_asgi import BrotliMiddleware
from fastapi import FastAPI, Query, Request
Expand All @@ -34,6 +33,7 @@
from starlette.datastructures import URL
from starlette.middleware.cors import CORSMiddleware
from starlette.responses import HTMLResponse, RedirectResponse
from tenacity import retry, stop_after_delay, wait_exponential

from extralit_server import helpers
from extralit_server._version import __version__ as extralit_version
Expand Down Expand Up @@ -361,7 +361,7 @@ async def configure_search_engine():
logging.getLogger("opensearch").setLevel(logging.ERROR)
logging.getLogger("opensearch_transport").setLevel(logging.ERROR)

@backoff.on_exception(backoff.expo, ConnectionError, max_time=60)
@retry(stop=stop_after_delay(60), wait=wait_exponential(multiplier=1, min=1, max=60))
async def ping_search_engine():
async for search_engine in get_search_engine():
if not await search_engine.ping():
Expand All @@ -376,7 +376,7 @@ async def ping_search_engine():


def configure_redis():
@backoff.on_exception(backoff.expo, ConnectionError, max_time=60)
@retry(stop=stop_after_delay(60), wait=wait_exponential(multiplier=1, min=1, max=60))
def ping_redis():
try:
REDIS_CONNECTION.ping()
Expand Down
145 changes: 145 additions & 0 deletions extralit-server/src/extralit_server/api/handlers/v1/workspaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,151 @@ async def workspace_doctor(
)
)

# Check 5: Database connections health with autofix
try:
import asyncio

from sqlalchemy import text
from sqlalchemy.exc import SQLAlchemyError

# Import the async engine to access database URL and dispose method
from extralit_server.database import async_engine

# Get database URL and detect type
db_url = str(async_engine.url)
is_postgresql = "postgresql" in db_url.lower()
is_sqlite = "sqlite" in db_url.lower()

async def check_db_health():
if is_postgresql:
# A. Liveness Check (The most important part)
# Simple query to prove the TCP pipe is open.
await db.execute(text("SELECT 1"))

# B. Deep Inspection (Optional - only if Liveness passes)
active_connections_query = text("""
SELECT
count(*) as active_connections,
count(*) filter (where state = 'idle in transaction' and now() - state_change > interval '1 minute') as stale_transaction_connections
FROM pg_stat_activity
WHERE datname = current_database()
""")

result = await db.execute(active_connections_query)
conn_stats = result.first()

if conn_stats:
stale_txn = conn_stats.stale_transaction_connections
active = conn_stats.active_connections

if stale_txn > 0:
# Idle in transaction is BAD. It blocks table locks.
checks.append(
WorkspaceDoctorCheckResult(
check_name="db_transaction_health",
status="warning",
message=f"Found {stale_txn} connections idle in transaction.",
)
)
else:
checks.append(
WorkspaceDoctorCheckResult(
check_name="db_connection_health",
status="ok",
message=f"Pool healthy. Active DB sessions: {active}",
)
)

elif is_sqlite:
# SQLite doesn't have connection pooling like PostgreSQL
# Just check if we can execute a simple query
simple_query = text("SELECT 1 as test")
result = await db.execute(simple_query)
test_result = result.scalar()

if test_result == 1:
checks.append(
WorkspaceDoctorCheckResult(
check_name="database_connections",
status="ok",
message="SQLite database connection is healthy (SQLite uses file-based connections, no pooling)",
fixed=False,
)
)
else:
checks.append(
WorkspaceDoctorCheckResult(
check_name="database_connections",
status="error",
message="SQLite database connection test failed",
fixed=False,
)
)
else:
# Other database types - skip detailed connection monitoring
checks.append(
WorkspaceDoctorCheckResult(
check_name="database_connections",
status="ok",
message=f"Database connection check skipped for {db_url.split('://')[0]} (unsupported for detailed monitoring)",
fixed=False,
)
)

# SAFETY TIMEOUT: If DB doesn't answer in 3s, the connection is dead.
# This prevents the doctor check from hanging for 15 mins.
await asyncio.wait_for(check_db_health(), timeout=3.0)

except (asyncio.TimeoutError, SQLAlchemyError, OSError) as e:
# Handle database connectivity issues
if autofix:
# THE AUTOFIX LOGIC
# If we timed out or got a connection error, the pool is likely stale.

error_msg = f"Database unresponsive (Timeout/Error). Resetting connection pool. Error: {e!s}"

# DISPOSE THE POOL
# This closes all internal sockets. The next request will force a fresh handshake.
try:
await async_engine.dispose()
checks.append(
WorkspaceDoctorCheckResult(
check_name="database_connections",
status="warning", # Warning because we had to reset
message=error_msg,
fixed=True, # We successfully reset the pool
)
)
except Exception as dispose_error:
error_msg += f" (Failed to dispose pool: {dispose_error!s})"
checks.append(
WorkspaceDoctorCheckResult(
check_name="database_connections",
status="error",
message=error_msg,
fixed=False,
)
)
else:
# Just report the issue without fixing
checks.append(
WorkspaceDoctorCheckResult(
check_name="database_connections",
status="error",
message=f"Database unresponsive (Timeout/Error). Run with --autofix to automatically reset connection pool. Error: {e!s}",
fixed=False,
)
)
except Exception as e:
checks.append(
WorkspaceDoctorCheckResult(
check_name="database_connections",
status="warning",
message=f"Could not check database connections: {e!s}",
fixed=False,
)
)

# Determine overall status
has_errors = any(check.status == "error" for check in checks)
has_fixed = any(check.fixed for check in checks)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from sqlalchemy.orm import selectinload

from extralit_server.cli.rich import echo_in_panel
from extralit_server.database import AsyncSessionLocal, retry_db_operation
from extralit_server.database import AsyncSessionLocal, db_retry_policy
from extralit_server.models import Dataset, Record, Response, Suggestion
from extralit_server.search_engine import SearchEngine, get_search_engine

Expand All @@ -33,7 +33,7 @@ class Reindexer:
YIELD_PER = 100

@classmethod
@retry_db_operation(max_retries=3, delay=1.0, backoff=2.0)
@db_retry_policy
async def reindex_dataset(cls, db: AsyncSession, search_engine: SearchEngine, dataset_id: UUID) -> Dataset:
dataset = (
await db.execute(
Expand All @@ -55,7 +55,7 @@ async def reindex_dataset(cls, db: AsyncSession, search_engine: SearchEngine, da

@classmethod
async def reindex_datasets(cls, db: AsyncSession, search_engine: SearchEngine) -> AsyncGenerator[Dataset, None]:
@retry_db_operation(max_retries=3, delay=1.0, backoff=2.0)
@db_retry_policy
async def _get_datasets_batch(offset: int = 0, limit: int = cls.YIELD_PER):
"""Get a batch of datasets with retry logic"""
return await db.execute(
Expand Down Expand Up @@ -109,7 +109,7 @@ async def _get_datasets_batch(offset: int = 0, limit: int = cls.YIELD_PER):
async def reindex_dataset_records(
cls, db: AsyncSession, search_engine: SearchEngine, dataset: Dataset
) -> AsyncGenerator[list[Record], None]:
@retry_db_operation(max_retries=3, delay=1.0, backoff=2.0)
@db_retry_policy
async def _get_records_batch(offset: int = 0, limit: int = cls.YIELD_PER):
"""Get a batch of records with retry logic"""
return await db.execute(
Expand Down Expand Up @@ -162,12 +162,12 @@ async def _get_records_batch(offset: int = 0, limit: int = cls.YIELD_PER):
continue

@classmethod
@retry_db_operation(max_retries=3, delay=0.5, backoff=2.0)
@db_retry_policy
async def count_datasets(cls, db: AsyncSession) -> int:
return (await db.execute(select(func.count(Dataset.id)))).scalar_one()

@classmethod
@retry_db_operation(max_retries=3, delay=0.5, backoff=2.0)
@db_retry_policy
async def count_dataset_records(cls, db: AsyncSession, dataset: Dataset) -> int:
return (await db.execute(select(func.count(Record.id)).filter_by(dataset_id=dataset.id))).scalar_one()

Expand Down
33 changes: 11 additions & 22 deletions extralit-server/src/extralit_server/contexts/accounts.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from sqlalchemy.orm import selectinload

from extralit_server.contexts import datasets
from extralit_server.database import retry_db_operation
from extralit_server.database import db_retry_policy
from extralit_server.enums import UserRole
from extralit_server.errors.future import NotUniqueError, UnprocessableEntityError
from extralit_server.models import User, Workspace, WorkspaceUser
Expand Down Expand Up @@ -86,34 +86,23 @@ async def delete_workspace(db: AsyncSession, workspace: Workspace):
return await workspace.delete(db)


@db_retry_policy
async def user_exists(db: AsyncSession, user_id: UUID) -> bool:
@retry_db_operation(max_retries=3, delay=0.1, backoff=2.0)
async def _execute_query():
return await db.scalar(select(exists().where(User.id == user_id)))

return await _execute_query()
return await db.scalar(select(exists().where(User.id == user_id)))


@db_retry_policy
async def get_user_by_username(db: AsyncSession, username: str) -> User | None:
@retry_db_operation(max_retries=3, delay=0.1, backoff=2.0)
async def _execute_query():
result = await db.execute(
select(User)
.filter(func.lower(User.username) == func.lower(username))
.options(selectinload(User.workspaces))
)
return result.scalar_one_or_none()

return await _execute_query()
result = await db.execute(
select(User).filter(func.lower(User.username) == func.lower(username)).options(selectinload(User.workspaces))
)
return result.scalar_one_or_none()


@db_retry_policy
async def get_user_by_api_key(db: AsyncSession, api_key: str) -> User | None:
@retry_db_operation(max_retries=3, delay=0.1, backoff=2.0)
async def _execute_query():
result = await db.execute(select(User).where(User.api_key == api_key).options(selectinload(User.workspaces)))
return result.scalar_one_or_none()

return await _execute_query()
result = await db.execute(select(User).where(User.api_key == api_key).options(selectinload(User.workspaces)))
return result.scalar_one_or_none()


async def list_users(db: "AsyncSession") -> Sequence[User]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@

from uuid import UUID

import backoff
import sqlalchemy
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from tenacity import retry, stop_after_delay, wait_exponential

from extralit_server.database import _get_async_db
from extralit_server.enums import DatasetDistributionStrategy, RecordStatus
Expand All @@ -41,7 +40,7 @@ async def unsafe_update_records_status(db: AsyncSession, records: list[Record]):
await _update_record_status(db, record)


@backoff.on_exception(backoff.expo, sqlalchemy.exc.SQLAlchemyError, max_time=MAX_TIME_RETRY_SQLALCHEMY_ERROR)
@retry(stop=stop_after_delay(MAX_TIME_RETRY_SQLALCHEMY_ERROR), wait=wait_exponential(multiplier=1, min=1, max=15))
async def update_record_status(search_engine: SearchEngine, record_id: UUID) -> Record:
async for db in _get_async_db(isolation_level="SERIALIZABLE"):
record = await Record.get_or_raise(
Expand Down
Loading
Loading