Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/intelstream/database/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,23 @@ async def add_message_chunk_metas_batch(self, chunks: list[MessageChunkMeta]) ->
session.add_all(chunks)
await session.commit()

async def count_message_chunk_metas(self) -> int:
async with self.session() as session:
result = await session.execute(select(func.count()).select_from(MessageChunkMeta))
return int(result.scalar_one())

async def get_message_chunk_metas_batch(
self, offset: int = 0, limit: int = 100
) -> list[MessageChunkMeta]:
async with self.session() as session:
result = await session.execute(
select(MessageChunkMeta)
.order_by(MessageChunkMeta.start_timestamp.asc(), MessageChunkMeta.id.asc())
.offset(offset)
.limit(limit)
)
return list(result.scalars().all())

async def get_message_chunk_metas_by_ids(self, chunk_ids: list[str]) -> list[MessageChunkMeta]:
if not chunk_ids:
return []
Expand Down
107 changes: 80 additions & 27 deletions src/intelstream/database/vector_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import asyncio
import os
import shutil
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Any

import structlog
Expand All @@ -26,47 +28,89 @@ class ChunkSearchResult:


class VectorStore:
_ARTICLES_COLLECTION = "articles"
_MESSAGE_CHUNKS_COLLECTION = "message_chunks"

def __init__(self, data_dir: str, dimensions: int = 384) -> None:
self._data_dir = data_dir
self._dimensions = dimensions
self._articles: zvec.Collection | None = None
self._message_chunks: zvec.Collection | None = None

async def initialize(self) -> None:
await asyncio.to_thread(os.makedirs, self._data_dir, exist_ok=True)
self._articles = await self._open_or_create_collection(self._ARTICLES_COLLECTION)
self._message_chunks = await self._open_or_create_collection(
self._MESSAGE_CHUNKS_COLLECTION
)

def _collection_path(self, collection_name: str) -> str:
return str(Path(self._data_dir) / collection_name)

def _collection_attr_name(self, collection_name: str) -> str:
if collection_name == self._ARTICLES_COLLECTION:
return "_articles"
if collection_name == self._MESSAGE_CHUNKS_COLLECTION:
return "_message_chunks"
raise ValueError(f"Unknown collection name: {collection_name}")

def _build_schema(self, collection_name: str) -> zvec.CollectionSchema:
import zvec

await asyncio.to_thread(os.makedirs, self._data_dir, exist_ok=True)
articles_path = f"{self._data_dir}/articles"
try:
schema = zvec.CollectionSchema(
name="articles",
vectors=zvec.VectorSchema("embedding", zvec.DataType.VECTOR_FP32, self._dimensions),
)
self._articles = await asyncio.to_thread(
zvec.create_and_open, path=articles_path, schema=schema
)
logger.info("Created new articles vector collection")
except Exception:
self._articles = await asyncio.to_thread(
zvec.open, path=articles_path, option=zvec.CollectionOption()
)
logger.info("Opened existing articles vector collection")
return zvec.CollectionSchema(
name=collection_name,
vectors=zvec.VectorSchema("embedding", zvec.DataType.VECTOR_FP32, self._dimensions),
)

async def _open_or_create_collection(self, collection_name: str) -> zvec.Collection:
import zvec

chunks_path = f"{self._data_dir}/message_chunks"
path = self._collection_path(collection_name)
try:
schema = zvec.CollectionSchema(
name="message_chunks",
vectors=zvec.VectorSchema("embedding", zvec.DataType.VECTOR_FP32, self._dimensions),
collection = await asyncio.to_thread(
zvec.create_and_open,
path=path,
schema=self._build_schema(collection_name),
)
self._message_chunks = await asyncio.to_thread(
zvec.create_and_open, path=chunks_path, schema=schema
)
logger.info("Created new message_chunks vector collection")
logger.info("Created new vector collection", collection=collection_name)
return collection
except Exception:
self._message_chunks = await asyncio.to_thread(
zvec.open, path=chunks_path, option=zvec.CollectionOption()
collection = await asyncio.to_thread(
zvec.open,
path=path,
option=zvec.CollectionOption(),
)
logger.info("Opened existing message_chunks vector collection")
logger.info("Opened existing vector collection", collection=collection_name)
return collection

async def _recreate_collection(self, collection_name: str) -> zvec.Collection:
attr_name = self._collection_attr_name(collection_name)
collection = getattr(self, attr_name)
path = self._collection_path(collection_name)

if collection is not None:
try:
await asyncio.to_thread(collection.destroy)
except Exception:
logger.warning(
"Failed to destroy vector collection cleanly, removing files manually",
collection=collection_name,
path=path,
)
finally:
setattr(self, attr_name, None)

if await asyncio.to_thread(os.path.exists, path):
await asyncio.to_thread(shutil.rmtree, path, True)

recreated = await self._open_or_create_collection(collection_name)
setattr(self, attr_name, recreated)
return recreated

async def _doc_count(self, collection: zvec.Collection | None) -> int:
if collection is None:
raise RuntimeError("VectorStore not initialized")
return await asyncio.to_thread(lambda: int(collection.stats.doc_count))

async def upsert_article(self, content_item_id: str, embedding: list[float]) -> None:
import zvec
Expand Down Expand Up @@ -108,6 +152,9 @@ async def delete_article(self, content_item_id: str) -> None:
raise RuntimeError("VectorStore not initialized")
await asyncio.to_thread(self._articles.delete, content_item_id)

async def article_doc_count(self) -> int:
return await self._doc_count(self._articles)

async def upsert_message_chunk(self, chunk_id: str, embedding: list[float]) -> None:
import zvec

Expand Down Expand Up @@ -149,6 +196,12 @@ async def delete_message_chunks_by_ids(self, chunk_ids: list[str]) -> None:
for chunk_id in chunk_ids:
await asyncio.to_thread(self._message_chunks.delete, chunk_id)

async def message_chunk_doc_count(self) -> int:
return await self._doc_count(self._message_chunks)

async def recreate_message_chunks_collection(self) -> None:
await self._recreate_collection(self._MESSAGE_CHUNKS_COLLECTION)

async def close(self) -> None:
if self._articles is not None:
await asyncio.to_thread(self._articles.flush)
Expand Down
87 changes: 85 additions & 2 deletions src/intelstream/discord/cogs/lore.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

import asyncio
import re
from contextlib import suppress
from datetime import UTC, datetime, timedelta
from typing import TYPE_CHECKING

Expand Down Expand Up @@ -33,6 +35,7 @@

BUFFER_FLUSH_MINUTES = 5
MAX_DISCORD_MESSAGE_LENGTH = 2000
HEALTH_CHECK_TOPK = 10


def _parse_timeframe(timeframe: str) -> tuple[datetime | None, datetime | None]:
Expand Down Expand Up @@ -89,6 +92,8 @@ def __init__(
self._llm_client: LLMClient | None = None
self._message_buffers: dict[str, list[RawMessage]] = {}
self._chunker: MessageChunker | None = None
self._index_rebuild_task: asyncio.Task[None] | None = None
self._index_rebuild_error: str | None = None

async def cog_load(self) -> None:
self._ingestion_service = MessageIngestionService(
Expand All @@ -111,10 +116,18 @@ async def cog_load(self) -> None:
max_messages=self.bot.settings.lore_chunk_max_messages,
)
self._flush_buffers.start()
self._index_rebuild_task = asyncio.create_task(
self._ensure_message_chunk_index(),
name="lore-index-rebuild",
)
logger.info("Lore cog loaded")

async def cog_unload(self) -> None:
self._flush_buffers.cancel()
if self._index_rebuild_task is not None:
self._index_rebuild_task.cancel()
with suppress(asyncio.CancelledError):
await self._index_rebuild_task
if self._ingestion_service and self._ingestion_service.is_running:
self._ingestion_service.stop_backfill()
await self._flush_all_buffers()
Expand All @@ -135,12 +148,82 @@ async def lore(
channel: discord.TextChannel | None = None, # noqa: ARG002
timeframe: str | None = None, # noqa: ARG002
) -> None:
if self._index_rebuild_task is not None and not self._index_rebuild_task.done():
message = (
"The /lore command is temporarily disabled while the message index is being "
"rebuilt. Check back soon!"
)
elif self._index_rebuild_error is not None:
message = (
"The /lore command is temporarily disabled because the message index needs "
"recovery. Check logs and try again after reindexing completes."
)
else:
message = (
"The /lore command is temporarily disabled while the message index is being "
"built. Check back soon!"
)
await interaction.response.send_message(
"The /lore command is temporarily disabled while the message index is being built. "
"Check back soon!",
message,
ephemeral=True,
)

async def _ensure_message_chunk_index(self) -> None:
if self._ingestion_service is None:
return

try:
expected_count = await self.bot.repository.count_message_chunk_metas()
if expected_count == 0:
logger.info("No stored lore chunks found; skipping vector index rebuild")
return

if await self._message_index_is_healthy(expected_count):
logger.info("Lore message index is healthy", chunks=expected_count)
return

logger.warning(
"Lore message index is unhealthy; rebuilding from stored chunks",
expected_chunks=expected_count,
)
rebuilt = await self._ingestion_service.rebuild_vector_index()
logger.info("Lore message index rebuilt", indexed=rebuilt)
except asyncio.CancelledError:
raise
except Exception as exc:
self._index_rebuild_error = str(exc)
logger.exception("Failed to rebuild lore message index", error=str(exc))

async def _message_index_is_healthy(self, expected_count: int) -> bool:
indexed_count = await self._vector_store.message_chunk_doc_count()
if indexed_count != expected_count:
logger.warning(
"Lore message index count mismatch",
expected=expected_count,
indexed=indexed_count,
)
return False

sample_batch = await self.bot.repository.get_message_chunk_metas_batch(limit=1)
if not sample_batch:
return True

sample = sample_batch[0]
query_embedding = await self._embedding_service.embed_text(sample.text)
results = await self._vector_store.search_message_chunks(
query_embedding,
topk=HEALTH_CHECK_TOPK,
)
if any(result.chunk_id == sample.id for result in results):
return True

logger.warning(
"Lore message index probe failed",
sample_chunk_id=sample.id,
result_ids=[result.chunk_id for result in results],
)
return False

@commands.Cog.listener("on_message")
async def on_message(self, message: discord.Message) -> None:
if not message.guild:
Expand Down
38 changes: 38 additions & 0 deletions src/intelstream/services/message_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,44 @@ async def store_chunks(self, chunks: list[Chunk]) -> int:

return len(metas)

async def rebuild_vector_index(self, batch_size: int = EMBED_BATCH_SIZE) -> int:
total_chunks = await self._repository.count_message_chunk_metas()
await self._vector_store.recreate_message_chunks_collection()

if total_chunks == 0:
logger.info("No stored message chunks to reindex")
return 0

indexed = 0
offset = 0

while True:
metas = await self._repository.get_message_chunk_metas_batch(
offset=offset,
limit=batch_size,
)
if not metas:
break

embeddings = await self._embedding_service.embed_batch([meta.text for meta in metas])
vector_items = [
(meta.id, embedding) for meta, embedding in zip(metas, embeddings, strict=True)
]
await self._vector_store.upsert_message_chunks_batch(vector_items)

indexed += len(metas)
offset += len(metas)

if indexed == total_chunks or indexed % (batch_size * 10) == 0:
logger.info(
"Lore vector index rebuild progress",
indexed=indexed,
total=total_chunks,
)

logger.info("Lore vector index rebuild complete", indexed=indexed, total=total_chunks)
return indexed

async def ingest_channel(
self,
channel: discord.TextChannel,
Expand Down
Loading
Loading