Skip to content
73 changes: 73 additions & 0 deletions backend/migrations/versions/20260219_0001_add_pr_creators_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""Add composite index for pr_creators DISTINCT ON query.

Revision ID: f5g6h7i8j9k0
Revises: e4f5g6h7i8j9
Create Date: 2026-02-19 00:01:00.000000

Adds a partial composite index to optimize the pr_creators CTE query pattern:
DISTINCT ON (repository, pr_number) ... ORDER BY repository, pr_number, created_at ASC

This query was timing out (>60s) on the contributors endpoint due to full table sort.
The index enables PostgreSQL to satisfy DISTINCT ON via index scan instead of sort.
"""

from alembic import op
from sqlalchemy import text

# revision identifiers, used by Alembic.
revision = "f5g6h7i8j9k0" # pragma: allowlist secret
down_revision = "e4f5g6h7i8j9" # pragma: allowlist secret
branch_labels = None
depends_on = None


def upgrade() -> None:
"""Create composite index for DISTINCT ON (repository, pr_number) queries.

Uses CONCURRENTLY to avoid locking the table during index creation.
CONCURRENTLY cannot run inside a transaction, so the active Alembic
transaction is committed first.
"""
conn = op.get_bind()
# CONCURRENTLY cannot run inside a transaction — commit Alembic's active transaction
conn.execute(text("COMMIT"))
# Drop any existing invalid index left by a prior failed CONCURRENTLY build
conn.execute(
text(
"""
DROP INDEX CONCURRENTLY IF EXISTS ix_webhooks_repo_pr_number_created_at
"""
)
)
conn.execute(
text(
"""
CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_webhooks_repo_pr_number_created_at
ON webhooks (repository, pr_number, created_at ASC)
WHERE pr_number IS NOT NULL
"""
)
)
# Re-open a transaction so Alembic can update alembic_version
conn.execute(text("BEGIN"))


def downgrade() -> None:
"""Drop the composite index.

Uses CONCURRENTLY to avoid locking the table during index removal.
CONCURRENTLY cannot run inside a transaction, so the active Alembic
transaction is committed first.
"""
conn = op.get_bind()
# CONCURRENTLY cannot run inside a transaction — commit Alembic's active transaction
conn.execute(text("COMMIT"))
conn.execute(
text(
"""
DROP INDEX CONCURRENTLY IF EXISTS ix_webhooks_repo_pr_number_created_at
"""
)
)
# Re-open a transaction so Alembic can update alembic_version
conn.execute(text("BEGIN"))
7 changes: 7 additions & 0 deletions backend/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ class Webhook(Base):
__table_args__ = (
Index("ix_webhooks_repository_created_at", "repository", "created_at"),
Index("ix_webhooks_repository_event_type", "repository", "event_type"),
Index(
"ix_webhooks_repo_pr_number_created_at",
"repository",
"pr_number",
"created_at",
postgresql_where=text("pr_number IS NOT NULL"),
),
)

id: Mapped[UUID] = mapped_column(
Expand Down
115 changes: 115 additions & 0 deletions tests/test_migration_add_pr_creators_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
"""Tests for the 20260219_0001_add_pr_creators_index migration.

Tests migration revision chain integrity and upgrade/downgrade operations
for the pr_creators DISTINCT ON composite index.
"""

import importlib
import types
from typing import Literal
from unittest.mock import MagicMock, patch

INDEX_NAME = "ix_webhooks_repo_pr_number_created_at"


class TestPrCreatorsIndexMigration:
"""Tests for the 20260219_0001_add_pr_creators_index migration."""

@staticmethod
def _load_migration() -> types.ModuleType:
"""Load the migration module."""
return importlib.import_module("backend.migrations.versions.20260219_0001_add_pr_creators_index")

def _run_migration_and_get_sql(
self, action: Literal["upgrade", "downgrade"], expected_call_count: int, sql_index: int
) -> str:
"""Run a migration action and return the DDL SQL string.

Args:
action: Migration action to run ("upgrade" or "downgrade").
expected_call_count: Expected number of execute calls.
sql_index: Index of the DDL SQL call in the call args list.

Returns:
The DDL SQL string at the specified index.
"""
migration = self._load_migration()
mock_bind = MagicMock()

with patch("alembic.op.get_bind", return_value=mock_bind):
getattr(migration, action)()

assert mock_bind.execute.call_count == expected_call_count
commit_sql = str(mock_bind.execute.call_args_list[0][0][0].text).strip()
assert commit_sql == "COMMIT"
begin_sql = str(mock_bind.execute.call_args_list[-1][0][0].text).strip()
assert begin_sql == "BEGIN"
return str(mock_bind.execute.call_args_list[sql_index][0][0].text).strip()

def _run_upgrade_and_get_sql(self) -> str:
"""Run the upgrade migration and return the DDL SQL string."""
return self._run_migration_and_get_sql("upgrade", expected_call_count=4, sql_index=2)

def _run_downgrade_and_get_sql(self) -> str:
"""Run the downgrade migration and return the DDL SQL string."""
return self._run_migration_and_get_sql("downgrade", expected_call_count=3, sql_index=1)

def test_revision_id(self) -> None:
"""Test that the migration has the expected revision ID."""
migration = self._load_migration()
assert migration.revision == "f5g6h7i8j9k0"

def test_down_revision_matches_previous_migration(self) -> None:
"""Test that down_revision points to the previous migration."""
migration = self._load_migration()
assert migration.down_revision == "e4f5g6h7i8j9"

def test_revision_chain_links_to_remove_cross_team_columns(self) -> None:
"""Test that the migration chains after the remove_cross_team_columns migration."""
previous_migration = importlib.import_module(
"backend.migrations.versions.20251210_0001_remove_cross_team_columns"
)
current_migration = self._load_migration()
assert current_migration.down_revision == previous_migration.revision

def test_branch_labels_is_none(self) -> None:
"""Test that branch_labels is None (linear migration chain)."""
migration = self._load_migration()
assert migration.branch_labels is None

def test_depends_on_is_none(self) -> None:
"""Test that depends_on is None (no cross-branch dependencies)."""
migration = self._load_migration()
assert migration.depends_on is None

def test_upgrade_creates_correct_index(self) -> None:
"""Test that upgrade creates the ix_webhooks_repo_pr_number_created_at index."""
executed_sql = self._run_upgrade_and_get_sql()
assert INDEX_NAME in executed_sql
assert "CONCURRENTLY" in executed_sql
assert "IF NOT EXISTS" in executed_sql
assert "pr_number IS NOT NULL" in executed_sql

def test_upgrade_targets_webhooks_table(self) -> None:
"""Test that upgrade creates the index on the webhooks table."""
executed_sql = self._run_upgrade_and_get_sql()
assert "ON webhooks" in executed_sql

def test_upgrade_index_column_order(self) -> None:
"""Test that the upgrade SQL specifies columns in the correct order."""
executed_sql = self._run_upgrade_and_get_sql()
assert "repository, pr_number, created_at ASC" in executed_sql

def test_upgrade_drops_invalid_index_before_create(self) -> None:
"""Test that upgrade drops any existing invalid index before creating."""
drop_sql = self._run_migration_and_get_sql("upgrade", expected_call_count=4, sql_index=1)
assert "DROP INDEX CONCURRENTLY" in drop_sql
assert "IF EXISTS" in drop_sql
assert INDEX_NAME in drop_sql

def test_downgrade_drops_correct_index(self) -> None:
"""Test that downgrade drops the ix_webhooks_repo_pr_number_created_at index."""
executed_sql = self._run_downgrade_and_get_sql()
assert "DROP INDEX CONCURRENTLY" in executed_sql
assert INDEX_NAME in executed_sql
assert "IF EXISTS" in executed_sql
53 changes: 53 additions & 0 deletions tests/test_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""Tests for the Webhook SQLAlchemy model.

Tests model definitions including:
- Table arguments and index definitions
"""

from sqlalchemy import Index

from backend.models import Webhook

INDEX_NAME = "ix_webhooks_repo_pr_number_created_at"


class TestWebhookCompositeIndex:
"""Tests for the composite index on the Webhook model."""

@staticmethod
def _find_index(name: str) -> Index:
for arg in Webhook.__table_args__:
if isinstance(arg, Index) and arg.name == name:
return arg
raise AssertionError(f"Index {name!r} not found")

def test_table_args_contains_composite_index(self) -> None:
"""Test that Webhook.__table_args__ includes the composite index."""
index_names = [arg.name for arg in Webhook.__table_args__ if isinstance(arg, Index)]
assert INDEX_NAME in index_names

def test_composite_index_columns(self) -> None:
"""Test that the composite index covers repository, pr_number, and created_at."""
target_index = self._find_index(INDEX_NAME)

column_names = [col.name for col in target_index.columns]
assert column_names == ["repository", "pr_number", "created_at"]

def test_composite_index_has_partial_where_clause(self) -> None:
"""Test that the composite index has a WHERE pr_number IS NOT NULL clause."""
target_index = self._find_index(INDEX_NAME)

dialect_options = target_index.dialect_options.get("postgresql", {})
where_clause = dialect_options.get("where")
assert where_clause is not None, "Index missing postgresql_where clause"
assert "pr_number IS NOT NULL" in str(where_clause.text)

def test_all_composite_index_names(self) -> None:
"""Test all composite index names in Webhook.__table_args__."""
expected_names = {
"ix_webhooks_repository_created_at",
"ix_webhooks_repository_event_type",
INDEX_NAME,
}
actual_names = {arg.name for arg in Webhook.__table_args__ if isinstance(arg, Index)}
assert actual_names == expected_names