Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 108 additions & 38 deletions src/xai_sdk/aio/collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import datetime
from typing import Optional, Sequence, Union

from opentelemetry.trace import SpanKind

from ..collections import (
DEFAULT_INDEXING_POLL_INTERVAL,
DEFAULT_INDEXING_TIMEOUT,
Expand All @@ -23,6 +25,9 @@
from ..files import _async_chunk_file_data
from ..poll_timer import PollTimer
from ..proto import collections_pb2, documents_pb2, shared_pb2, types_pb2
from ..telemetry import get_tracer

tracer = get_tracer(__name__)


class Client(BaseClient):
Expand Down Expand Up @@ -74,15 +79,26 @@ async def create(
else:
field_definitions_pb.append(field_definition)

return await self._collections_stub.CreateCollection(
collections_pb2.CreateCollectionRequest(
collection_name=name,
index_configuration=types_pb2.IndexConfiguration(model_name=model_name) if model_name else None,
chunk_configuration=chunk_configuration_pb,
metric_space=metric_space_pb,
field_definitions=field_definitions_pb,
with tracer.start_as_current_span(
name="collections.create_collection",
kind=SpanKind.CLIENT,
attributes={
"operation.name": "create_collection",
"provider.name": "xai",
},
) as span:
collection = await self._collections_stub.CreateCollection(
collections_pb2.CreateCollectionRequest(
collection_name=name,
index_configuration=types_pb2.IndexConfiguration(model_name=model_name) if model_name else None,
chunk_configuration=chunk_configuration_pb,
metric_space=metric_space_pb,
field_definitions=field_definitions_pb,
)
)
)
span.set_attribute("collection.id", collection.collection_id)
span.set_attribute("collection.name", collection.collection_name)
return collection

async def list(
self,
Expand Down Expand Up @@ -167,24 +183,42 @@ async def update(
chunk_configuration_pb = _chunk_configuration_to_pb(chunk_configuration)
else:
chunk_configuration_pb = chunk_configuration

return await self._collections_stub.UpdateCollection(
collections_pb2.UpdateCollectionRequest(
collection_id=collection_id,
collection_name=name,
chunk_configuration=chunk_configuration_pb,
with tracer.start_as_current_span(
name="collections.update_collection",
kind=SpanKind.CLIENT,
attributes={
"operation.name": "update_collection",
"provider.name": "xai",
},
) as span:
collection = await self._collections_stub.UpdateCollection(
collections_pb2.UpdateCollectionRequest(
collection_id=collection_id,
collection_name=name,
chunk_configuration=chunk_configuration_pb,
)
)
)
span.set_attribute("collection.id", collection.collection_id)
span.set_attribute("collection.name", collection.collection_name)
return collection

async def delete(self, collection_id: str) -> None:
"""Deletes a collection.

Args:
collection_id: The ID of the collection to delete.
"""
return await self._collections_stub.DeleteCollection(
collections_pb2.DeleteCollectionRequest(collection_id=collection_id)
)
with tracer.start_as_current_span(
name="collections.delete_collection",
kind=SpanKind.CLIENT,
attributes={
"operation.name": "delete_collection",
"provider.name": "xai",
},
) as _span:
return await self._collections_stub.DeleteCollection(
collections_pb2.DeleteCollectionRequest(collection_id=collection_id)
)

async def search(
self,
Expand Down Expand Up @@ -286,8 +320,17 @@ async def upload_document(
"""
# Upload the raw bytes via the streaming Files API, then attach to the collection.
upload_chunks = _async_chunk_file_data(filename=name, data=data)

uploaded_file = await self._files_stub.UploadFile(upload_chunks)
with tracer.start_as_current_span(
name="collections.upload_document",
kind=SpanKind.CLIENT,
attributes={
"operation.name": "upload_document",
"provider.name": "xai",
},
) as span:
uploaded_file = await self._files_stub.UploadFile(upload_chunks)
span.set_attribute("file.id", uploaded_file.id)
span.set_attribute("file.name", uploaded_file.filename)

# Attach the uploaded file to the target collection as a document.
await self._collections_stub.AddDocumentToCollection(
Expand Down Expand Up @@ -364,13 +407,21 @@ async def add_existing_document(
file_id: The ID of the file (document) to add.
fields: Additional metadata fields to store with the document in this collection.
"""
return await self._collections_stub.AddDocumentToCollection(
collections_pb2.AddDocumentToCollectionRequest(
collection_id=collection_id,
file_id=file_id,
fields=fields,
with tracer.start_as_current_span(
name="collections.add_existing_document",
kind=SpanKind.CLIENT,
attributes={
"operation.name": "add_existing_document",
"provider.name": "xai",
},
) as _span:
await self._collections_stub.AddDocumentToCollection(
collections_pb2.AddDocumentToCollectionRequest(
collection_id=collection_id,
file_id=file_id,
fields=fields,
)
)
)

async def list_documents(
self,
Expand Down Expand Up @@ -456,9 +507,17 @@ async def remove_document(self, collection_id: str, file_id: str) -> None:
collection_id: The ID of the collection to remove the document from.
file_id: The ID of the file (document) to remove.
"""
return await self._collections_stub.RemoveDocumentFromCollection(
collections_pb2.RemoveDocumentFromCollectionRequest(collection_id=collection_id, file_id=file_id)
)
with tracer.start_as_current_span(
name="collections.remove_document",
kind=SpanKind.CLIENT,
attributes={
"operation.name": "remove_document",
"provider.name": "xai",
},
) as _span:
return await self._collections_stub.RemoveDocumentFromCollection(
collections_pb2.RemoveDocumentFromCollectionRequest(collection_id=collection_id, file_id=file_id)
)

async def update_document(
self,
Expand All @@ -482,16 +541,27 @@ async def update_document(
Returns:
The updated metadata for the document.
"""
return await self._collections_stub.UpdateDocument(
collections_pb2.UpdateDocumentRequest(
collection_id=collection_id,
file_id=file_id,
name=name,
data=data,
content_type=content_type,
fields=fields,
with tracer.start_as_current_span(
name="collections.update_document",
kind=SpanKind.CLIENT,
attributes={
"operation.name": "update_document",
"provider.name": "xai",
},
) as span:
document = await self._collections_stub.UpdateDocument(
collections_pb2.UpdateDocumentRequest(
collection_id=collection_id,
file_id=file_id,
name=name,
data=data,
content_type=content_type,
fields=fields,
)
)
)
span.set_attribute("document.id", document.file_metadata.file_id)
span.set_attribute("document.name", document.file_metadata.name)
return document

async def reindex_document(self, collection_id: str, file_id: str) -> None:
"""Regenerates indices for a document.
Expand Down
Loading