Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 8 additions & 29 deletions src/intelstream/adapters/strategies/llm_extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,8 @@ async def discover(
if isinstance(posts_data, list):
posts = []
for p in posts_data:
if (
isinstance(p, dict)
and isinstance(p.get("url"), str)
and p.get("url")
):
posts.append(
DiscoveredPost(url=p["url"], title=p.get("title", ""))
)
if isinstance(p, dict) and isinstance(p.get("url"), str) and p.get("url"):
posts.append(DiscoveredPost(url=p["url"], title=p.get("title", "")))
if posts:
logger.debug(
"Using cached LLM extraction",
Expand Down Expand Up @@ -131,12 +125,7 @@ def _get_content_hash(self, html: str) -> str:
):
tag.decompose()

main = (
soup.find("main")
or soup.find("article")
or soup.find(id="content")
or soup.body
)
main = soup.find("main") or soup.find("article") or soup.find(id="content") or soup.body

if main:
text = " ".join(main.get_text().split())
Expand All @@ -150,16 +139,10 @@ async def _fetch_html(self, url: str) -> str | None:
}
try:
if self._http_client:
response = await self._http_client.get(
url, headers=headers, follow_redirects=True
)
response = await self._http_client.get(url, headers=headers, follow_redirects=True)
else:
async with httpx.AsyncClient(
timeout=get_settings().http_timeout_seconds
) as client:
response = await client.get(
url, headers=headers, follow_redirects=True
)
async with httpx.AsyncClient(timeout=get_settings().http_timeout_seconds) as client:
response = await client.get(url, headers=headers, follow_redirects=True)
response.raise_for_status()
return response.text
except httpx.HTTPError as e:
Expand All @@ -169,9 +152,7 @@ async def _fetch_html(self, url: str) -> str | None:
def _clean_html(self, html: str) -> str:
soup = BeautifulSoup(html, "lxml")

for tag in soup.find_all(
["script", "style", "noscript", "svg", "path", "iframe"]
):
for tag in soup.find_all(["script", "style", "noscript", "svg", "path", "iframe"]):
tag.decompose()

for tag in soup.find_all(True):
Expand Down Expand Up @@ -286,7 +267,5 @@ def parse_and_validate(data: str) -> list[dict[str, str]] | None:
if result is not None:
return result

logger.warning(
"Failed to extract JSON from LLM response", response_preview=text[:200]
)
logger.warning("Failed to extract JSON from LLM response", response_preview=text[:200])
return []
16 changes: 4 additions & 12 deletions src/intelstream/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,12 @@ class Settings(BaseSettings):
description="LLM provider for summarization: anthropic, openai, gemini, or kimi",
)

anthropic_api_key: str | None = Field(
default=None, description="Anthropic API key for Claude"
)
anthropic_api_key: str | None = Field(default=None, description="Anthropic API key for Claude")
openai_api_key: str | None = Field(default=None, description="OpenAI API key")
gemini_api_key: str | None = Field(
default=None, description="Google Gemini API key"
)
kimi_api_key: str | None = Field(
default=None, description="Kimi (Moonshot AI) API key"
)
gemini_api_key: str | None = Field(default=None, description="Google Gemini API key")
kimi_api_key: str | None = Field(default=None, description="Kimi (Moonshot AI) API key")

youtube_api_key: str | None = Field(
default=None, description="YouTube Data API key (optional)"
)
youtube_api_key: str | None = Field(default=None, description="YouTube Data API key (optional)")

twitter_bearer_token: str | None = Field(
default=None,
Expand Down
27 changes: 22 additions & 5 deletions src/intelstream/database/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -938,23 +938,40 @@ async def add_message_chunk_metas_batch(self, chunks: list[MessageChunkMeta]) ->
session.add_all(chunks)
await session.commit()

async def count_message_chunk_metas(self) -> int:
async def count_message_chunk_metas(self, guild_id: str | None = None) -> int:
async with self.session() as session:
result = await session.execute(select(func.count()).select_from(MessageChunkMeta))
query = select(func.count()).select_from(MessageChunkMeta)
if guild_id is not None:
query = query.where(MessageChunkMeta.guild_id == guild_id)
result = await session.execute(query)
return int(result.scalar_one())

async def get_message_chunk_metas_batch(
self, offset: int = 0, limit: int = 100
self,
offset: int = 0,
limit: int = 100,
guild_id: str | None = None,
) -> list[MessageChunkMeta]:
async with self.session() as session:
query = select(MessageChunkMeta)
if guild_id is not None:
query = query.where(MessageChunkMeta.guild_id == guild_id)
result = await session.execute(
select(MessageChunkMeta)
.order_by(MessageChunkMeta.start_timestamp.asc(), MessageChunkMeta.id.asc())
query.order_by(MessageChunkMeta.start_timestamp.asc(), MessageChunkMeta.id.asc())
.offset(offset)
.limit(limit)
)
return list(result.scalars().all())

async def get_message_chunk_guild_ids(self) -> list[str]:
async with self.session() as session:
result = await session.execute(
select(MessageChunkMeta.guild_id)
.distinct()
.order_by(MessageChunkMeta.guild_id.asc())
)
return [str(guild_id) for guild_id in result.scalars().all()]

async def get_message_chunk_metas_by_ids(self, chunk_ids: list[str]) -> list[MessageChunkMeta]:
if not chunk_ids:
return []
Expand Down
128 changes: 99 additions & 29 deletions src/intelstream/database/vector_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,40 @@ def __init__(self, data_dir: str, dimensions: int = 384) -> None:
self._data_dir = data_dir
self._dimensions = dimensions
self._articles: zvec.Collection | None = None
self._message_chunks: zvec.Collection | None = None
self._message_chunks: dict[str, zvec.Collection] = {}

async def initialize(self) -> None:
await asyncio.to_thread(os.makedirs, self._data_dir, exist_ok=True)
self._articles = await self._open_or_create_collection(self._ARTICLES_COLLECTION)
self._message_chunks = await self._open_or_create_collection(
self._MESSAGE_CHUNKS_COLLECTION
)
await asyncio.to_thread(self._warn_if_legacy_message_chunk_collection_present)

def _collection_path(self, collection_name: str) -> str:
return str(Path(self._data_dir) / collection_name)

def _collection_attr_name(self, collection_name: str) -> str:
if collection_name == self._ARTICLES_COLLECTION:
return "_articles"
if collection_name == self._MESSAGE_CHUNKS_COLLECTION:
return "_message_chunks"
raise ValueError(f"Unknown collection name: {collection_name}")

def _message_chunk_collection_name(self, guild_id: str) -> str:
return f"{self._MESSAGE_CHUNKS_COLLECTION}_{guild_id}"

def _message_chunk_collection_path(self, guild_id: str) -> str:
return str(Path(self._data_dir) / self._MESSAGE_CHUNKS_COLLECTION / guild_id)

def _warn_if_legacy_message_chunk_collection_present(self) -> None:
legacy_root = Path(self._collection_path(self._MESSAGE_CHUNKS_COLLECTION))
if not legacy_root.exists():
return

legacy_files = [entry.name for entry in legacy_root.iterdir() if entry.is_file()]
if legacy_files:
logger.warning(
"Detected legacy global lore vector collection files; they are no longer used",
path=str(legacy_root),
files=sorted(legacy_files),
)

def _build_schema(self, collection_name: str) -> zvec.CollectionSchema:
import zvec

Expand All @@ -62,10 +77,12 @@ def _build_schema(self, collection_name: str) -> zvec.CollectionSchema:
vectors=zvec.VectorSchema("embedding", zvec.DataType.VECTOR_FP32, self._dimensions),
)

async def _open_or_create_collection(self, collection_name: str) -> zvec.Collection:
async def _open_or_create_collection(
self, collection_name: str, path: str | None = None
) -> zvec.Collection:
import zvec

path = self._collection_path(collection_name)
path = path or self._collection_path(collection_name)
try:
collection = await asyncio.to_thread(
zvec.create_and_open,
Expand Down Expand Up @@ -107,6 +124,48 @@ async def _recreate_collection(self, collection_name: str) -> zvec.Collection:
setattr(self, attr_name, recreated)
return recreated

async def _message_chunk_collection(
self, guild_id: str, *, create: bool
) -> zvec.Collection | None:
collection = self._message_chunks.get(guild_id)
if collection is not None:
return collection

path = self._message_chunk_collection_path(guild_id)
if not create and not await asyncio.to_thread(os.path.exists, path):
return None

collection = await self._open_or_create_collection(
self._message_chunk_collection_name(guild_id),
path=path,
)
self._message_chunks[guild_id] = collection
return collection

async def _recreate_message_chunk_collection(self, guild_id: str) -> zvec.Collection:
collection = self._message_chunks.pop(guild_id, None)
path = self._message_chunk_collection_path(guild_id)

if collection is not None:
try:
await asyncio.to_thread(collection.destroy)
except Exception:
logger.warning(
"Failed to destroy vector collection cleanly, removing files manually",
collection=self._message_chunk_collection_name(guild_id),
path=path,
)

if await asyncio.to_thread(os.path.exists, path):
await asyncio.to_thread(shutil.rmtree, path, True)

recreated = await self._open_or_create_collection(
self._message_chunk_collection_name(guild_id),
path=path,
)
self._message_chunks[guild_id] = recreated
return recreated

async def _doc_count(self, collection: zvec.Collection | None) -> int:
if collection is None:
raise RuntimeError("VectorStore not initialized")
Expand Down Expand Up @@ -155,57 +214,68 @@ async def delete_article(self, content_item_id: str) -> None:
async def article_doc_count(self) -> int:
return await self._doc_count(self._articles)

async def upsert_message_chunk(self, chunk_id: str, embedding: list[float]) -> None:
async def upsert_message_chunk(
self, guild_id: str, chunk_id: str, embedding: list[float]
) -> None:
import zvec

if self._message_chunks is None:
collection = await self._message_chunk_collection(guild_id, create=True)
if collection is None:
raise RuntimeError("VectorStore not initialized")
doc = zvec.Doc(
id=chunk_id,
vectors={"embedding": embedding},
)
await asyncio.to_thread(self._message_chunks.upsert, [doc])
await asyncio.to_thread(collection.upsert, [doc])

async def upsert_message_chunks_batch(self, items: list[tuple[str, list[float]]]) -> None:
async def upsert_message_chunks_batch(
self, guild_id: str, items: list[tuple[str, list[float]]]
) -> None:
import zvec

if self._message_chunks is None:
collection = await self._message_chunk_collection(guild_id, create=True)
if collection is None:
raise RuntimeError("VectorStore not initialized")
if not items:
return
docs = [zvec.Doc(id=cid, vectors={"embedding": emb}) for cid, emb in items]
await asyncio.to_thread(self._message_chunks.upsert, docs)
await asyncio.to_thread(collection.upsert, docs)

async def search_message_chunks(
self, query_embedding: list[float], topk: int = 30
self, guild_id: str, query_embedding: list[float], topk: int = 30
) -> list[ChunkSearchResult]:
import zvec

if self._message_chunks is None:
raise RuntimeError("VectorStore not initialized")
collection = await self._message_chunk_collection(guild_id, create=False)
if collection is None:
return []
results: Any = await asyncio.to_thread(
self._message_chunks.query,
collection.query,
zvec.VectorQuery("embedding", vector=query_embedding),
topk=topk,
)
return [ChunkSearchResult(chunk_id=r.id, score=r.score) for r in results]

async def delete_message_chunks_by_ids(self, chunk_ids: list[str]) -> None:
if self._message_chunks is None:
raise RuntimeError("VectorStore not initialized")
async def delete_message_chunks_by_ids(self, guild_id: str, chunk_ids: list[str]) -> None:
collection = await self._message_chunk_collection(guild_id, create=False)
if collection is None:
return
for chunk_id in chunk_ids:
await asyncio.to_thread(self._message_chunks.delete, chunk_id)
await asyncio.to_thread(collection.delete, chunk_id)

async def message_chunk_doc_count(self) -> int:
return await self._doc_count(self._message_chunks)
async def message_chunk_doc_count(self, guild_id: str) -> int:
collection = await self._message_chunk_collection(guild_id, create=False)
if collection is None:
return 0
return await self._doc_count(collection)

async def recreate_message_chunks_collection(self) -> None:
await self._recreate_collection(self._MESSAGE_CHUNKS_COLLECTION)
async def recreate_message_chunks_collection(self, guild_id: str) -> None:
await self._recreate_message_chunk_collection(guild_id)

async def close(self) -> None:
if self._articles is not None:
await asyncio.to_thread(self._articles.flush)
self._articles = None
if self._message_chunks is not None:
await asyncio.to_thread(self._message_chunks.flush)
self._message_chunks = None
for collection in self._message_chunks.values():
await asyncio.to_thread(collection.flush)
self._message_chunks.clear()
Loading
Loading