Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions python/packages/core/agent_framework/openai/_assistants_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import json
import logging
import sys
from collections.abc import (
AsyncIterable,
Expand All @@ -16,7 +17,9 @@

from openai import AsyncOpenAI
from openai.types.beta.threads import (
FileCitationAnnotation,
FileCitationDeltaAnnotation,
FilePathAnnotation,
FilePathDeltaAnnotation,
ImageURLContentBlockParam,
ImageURLParam,
Expand All @@ -26,6 +29,9 @@
TextContentBlockParam,
TextDeltaBlock,
)
from openai.types.beta.threads import (
Message as ThreadMessage,
)
from openai.types.beta.threads.run_create_params import AdditionalMessage
from openai.types.beta.threads.run_submit_tool_outputs_params import ToolOutput
from openai.types.beta.threads.runs import RunStep
Expand Down Expand Up @@ -72,6 +78,8 @@
if TYPE_CHECKING:
from .._middleware import MiddlewareTypes

logger = logging.getLogger("agent_framework.openai")


# region OpenAI Assistants Options TypedDict

Expand Down Expand Up @@ -610,6 +618,69 @@ async def _process_stream_events(self, stream: Any, thread_id: str) -> AsyncIter
raw_representation=response.data,
response_id=response_id,
)
elif response.event == "thread.message.completed" and isinstance(response.data, ThreadMessage):
# Process completed message to extract fully resolved annotations.
# Delta events may carry partial/empty annotation data; the completed
# message contains the final text with all citation details populated.
completed_contents: list[Content] = []
for block in response.data.content:
if block.type != "text":
continue
text_content = Content.from_text(block.text.value)
if block.text.annotations:
text_content.annotations = []
for completed_annotation in block.text.annotations:
if isinstance(completed_annotation, FileCitationAnnotation):
props: dict[str, Any] = {
"text": completed_annotation.text,
}
ann = Annotation(
type="citation",
additional_properties=props,
raw_representation=completed_annotation,
)
if completed_annotation.file_citation and completed_annotation.file_citation.file_id:
ann["file_id"] = completed_annotation.file_citation.file_id
if completed_annotation.start_index is not None and completed_annotation.end_index is not None:
ann["annotated_regions"] = [
TextSpanRegion(
type="text_span",
start_index=completed_annotation.start_index,
end_index=completed_annotation.end_index,
)
]
text_content.annotations.append(ann)
elif isinstance(completed_annotation, FilePathAnnotation):
ann = Annotation(
type="citation",
additional_properties={
"text": completed_annotation.text,
},
raw_representation=completed_annotation,
)
if completed_annotation.file_path and completed_annotation.file_path.file_id:
ann["file_id"] = completed_annotation.file_path.file_id
if completed_annotation.start_index is not None and completed_annotation.end_index is not None:
ann["annotated_regions"] = [
TextSpanRegion(
type="text_span",
start_index=completed_annotation.start_index,
end_index=completed_annotation.end_index,
)
]
text_content.annotations.append(ann)
else:
logger.debug("Unparsed annotation type: %s", completed_annotation.type)
completed_contents.append(text_content)
if completed_contents:
yield ChatResponseUpdate(
role="assistant",
contents=completed_contents,
conversation_id=thread_id,
message_id=response_id,
raw_representation=response.data,
response_id=response_id,
)
elif response.event == "thread.run.requires_action" and isinstance(response.data, Run):
contents = self._parse_function_calls_from_assistants(response.data, response_id)
if contents:
Expand Down
Loading