Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### New features

* Providers now distinguish text content from thinking content while streaming via the new `stream_content()` method. This allows downstream packages like shinychat to provide specific UI for thinking content. (#265)
* `.stream()` and `.stream_async()` now support a `data_model` parameter for structured data extraction while streaming. (#262)
* `.to_solver()` now supports a `data_model` parameter for structured data extraction in evals. When provided, the solver uses `.chat_structured()` instead of `.chat()` and outputs JSON-serialized data. (#264)

Expand Down
37 changes: 36 additions & 1 deletion chatlas/_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,25 @@ async def chat_perform_async(
) -> AsyncIterable[ChatCompletionChunkT] | ChatCompletionT: ...

@abstractmethod
def stream_text(self, chunk: ChatCompletionChunkT) -> Optional[str]: ...
def stream_content(self, chunk: ChatCompletionChunkT) -> Optional[Content]:
"""
Extract content from a streaming chunk.

Returns a Content object (e.g., ContentText, ContentThinking) representing
the content in this chunk, or None if there is no content.
"""
...

def stream_text(self, chunk: ChatCompletionChunkT) -> Optional[str]:
"""
Extract text from a streaming chunk.

This is a convenience method that extracts the text from stream_content().
"""
content = self.stream_content(chunk)
if content is None:
return None
return _content_text(content)

@abstractmethod
def stream_merge_chunks(
Expand Down Expand Up @@ -385,3 +403,20 @@ def batch_result_turn(
Turn object or None if the result was an error
"""
raise NotImplementedError("This provider does not support batch processing")


def _content_text(content: Content) -> str:
"""
Extract text from a Content object.

This helper function is used by stream_text() to convert Content objects
to their string representation for streaming.
"""
from ._content import ContentText, ContentThinking

if isinstance(content, ContentThinking):
return content.thinking
elif isinstance(content, ContentText):
return content.text
else:
return str(content)
13 changes: 10 additions & 3 deletions chatlas/_provider_anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,12 +463,19 @@ def _structured_tool_call(**kwargs: Any):

return kwargs_full

def stream_text(self, chunk) -> Optional[str]:
def stream_content(self, chunk):
if chunk.type == "content_block_delta":
if chunk.delta.type == "text_delta":
return chunk.delta.text
text = chunk.delta.text
# Filter empty/whitespace to avoid ContentText converting to "[empty string]"
if not text or text.isspace():
return None
return ContentText(text=text)
if chunk.delta.type == "thinking_delta":
return chunk.delta.thinking
thinking = chunk.delta.thinking
if not thinking or thinking.isspace():
return None
return ContentThinking(thinking=thinking)
return None

def stream_merge_chunks(self, completion, chunk):
Expand Down
25 changes: 22 additions & 3 deletions chatlas/_provider_google.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
ContentJson,
ContentPDF,
ContentText,
ContentThinking,
ContentToolRequest,
ContentToolResult,
)
Expand Down Expand Up @@ -361,10 +362,26 @@ def _chat_perform_args(

return kwargs_full

def stream_text(self, chunk) -> Optional[str]:
def stream_content(self, chunk):
try:
# Errors if there is no text (e.g., tool request)
return chunk.text
candidates = chunk.candidates
if not candidates:
return None
content = candidates[0].content
if content is None:
return None
parts = content.parts
if not parts:
return None
part = parts[0]
text = part.text
# Filter empty/whitespace to avoid ContentText converting to "[empty string]"
if not text or text.isspace():
return None
# Check if this is thinking content
if getattr(part, "thought", False):
return ContentThinking(thinking=text)
return ContentText(text=text)
except Exception:
return None

Expand Down Expand Up @@ -553,6 +570,8 @@ def _as_turn(
if text:
if has_data_model:
contents.append(ContentJson(value=orjson.loads(text)))
elif part.get("thought"):
contents.append(ContentThinking(thinking=text))
else:
contents.append(ContentText(text=text))
function_call = part.get("function_call")
Expand Down
13 changes: 9 additions & 4 deletions chatlas/_provider_openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,16 +292,21 @@ def _chat_perform_args(

return kwargs_full

def stream_text(self, chunk):
def stream_content(self, chunk):
if chunk.type == "response.output_text.delta":
# https://platform.openai.com/docs/api-reference/responses-streaming/response/output_text/delta
return chunk.delta
# Filter empty/whitespace to avoid ContentText converting to "[empty string]"
if not chunk.delta or chunk.delta.isspace():
return None
return ContentText(text=chunk.delta)
if chunk.type == "response.reasoning_summary_text.delta":
# https://platform.openai.com/docs/api-reference/responses-streaming/response/reasoning_summary_text/delta
return chunk.delta
if not chunk.delta or chunk.delta.isspace():
return None
return ContentThinking(thinking=chunk.delta)
if chunk.type == "response.reasoning_summary_text.done":
# https://platform.openai.com/docs/api-reference/responses-streaming/response/reasoning_summary_text/done
return "\n\n"
return None
return None

def stream_merge_chunks(self, completion, chunk):
Expand Down
8 changes: 6 additions & 2 deletions chatlas/_provider_openai_completions.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,14 @@ def _chat_perform_args(

return kwargs_full

def stream_text(self, chunk):
def stream_content(self, chunk):
if not chunk.choices:
return None
return chunk.choices[0].delta.content
text = chunk.choices[0].delta.content
# Filter empty/whitespace to avoid ContentText converting to "[empty string]"
if not text or text.isspace():
return None
return ContentText(text=text)

def stream_merge_chunks(self, completion, chunk):
chunkd = chunk.model_dump()
Expand Down
8 changes: 6 additions & 2 deletions chatlas/_provider_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,13 +356,17 @@ def _complete_request(

return req

def stream_text(self, chunk):
def stream_content(self, chunk):
if not chunk.choices:
return None
delta = chunk.choices[0].delta
if delta is None or "content" not in delta:
return None
return delta["content"]
text = delta["content"]
# Filter empty/whitespace to avoid ContentText converting to "[empty string]"
if not text or text.isspace():
return None
return ContentText(text=text)

# Snowflake sort-of follows OpenAI/Anthropic streaming formats except they
# don't have the critical "index" field in the delta that the merge logic
Expand Down
9 changes: 6 additions & 3 deletions tests/test_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,14 @@ async def test_simple_streaming_chat_async():
chunks = [chunk async for chunk in res]
assert len(chunks) > 2
result = "".join(chunks)
rainbow_re = "^red *\norange *\nyellow *\ngreen *\nblue *\nindigo *\nviolet *\n?$"
assert re.match(rainbow_re, result.lower())
# Streaming may not include whitespace chunks, so check content without whitespace
res_normalized = re.sub(r"\s+", "", result).lower()
assert res_normalized == "redorangeyellowgreenblueindigoviolet"
turn = chat.get_last_turn()
assert turn is not None
assert re.match(rainbow_re, turn.text.lower())
# Turn text should have the full response with whitespace
res_turn = re.sub(r"\s+", "", turn.text).lower()
assert res_turn == "redorangeyellowgreenblueindigoviolet"


def test_basic_repr(snapshot):
Expand Down
Loading