Skip to content

feat: serialize LangChain multimodal content to ingest content blocks#487

Open
pratduv wants to merge 1 commit intomainfrom
feature/sc-56110/multimodal-content-blocks-langchain
Open

feat: serialize LangChain multimodal content to ingest content blocks#487
pratduv wants to merge 1 commit intomainfrom
feature/sc-56110/multimodal-content-blocks-langchain

Conversation

@pratduv
Copy link
Contributor

@pratduv pratduv commented Feb 25, 2026

User description

Summary

  • Convert LangChain's multimodal message format (image_url, audio_url, video_url, etc.) into Galileo's IngestContentBlock schema (TextContentBlock, DataContentBlock) instead of flattening list content to a plain string
  • Add IngestTraces client for the orbit ingest service (opt-in via GALILEO_INGEST_URL env var)
  • Add debug logging at serialization and ingest payload boundaries

Changes

Serialization (src/galileo/utils/serialization.py)

  • New _convert_langchain_content_block() maps LangChain's {"type": "image_url", "image_url": {"url": "..."}} format to DataContentBlock(type="image", url="..."), with base64 data URI detection
  • EventSerializer now converts list content on both AIMessage and BaseMessage subclasses to content block arrays instead of extracting only the first text element

Ingest client (src/galileo/traces.py)

  • New IngestTraces class sends traces directly to the orbit ingest service via httpx.AsyncClient
  • _log_ingest_content_blocks() helper logs content block types at DEBUG level before HTTP POST
  • New Routes.ingest_traces constant

Logger integration (src/galileo/logger/logger.py)

  • GalileoLogger creates an IngestTraces client when GALILEO_INGEST_URL is set, preferring it over the API-proxied path

Handler (src/galileo/handlers/langchain/handler.py)

  • Debug log in on_chat_model_start showing multimodal message count

Test plan

  • TestConvertLangchainContentBlock -- unit tests for text, image URL, base64, audio, unknown fallback
  • TestMultimodalContentSerialization -- round-trip serialization of AIMessage/HumanMessage with multimodal content
  • test_on_chat_model_start_multimodal -- integration test verifying LangChain callback produces structured content blocks
  • Updated existing Responses API test to expect content block arrays instead of flattened string

Dependencies

Requires galileo-core#feature/sc-56110 to be merged and released first (adds IngestContentBlock, TextContentBlock, DataContentBlock to galileo_core.schemas.shared.content_blocks).

Made with Cursor


Generated description

Below is a concise technical summary of the changes proposed in this PR:
Enhances the LangChain callback handler to natively serialize multimodal content, such as images and audio, into Galileo's structured IngestContentBlock schema, moving away from flattened string representations. Introduces a new IngestTraces client and a dedicated /ingest/traces route, enabling direct and optimized trace submission to the orbit ingest service, configurable via an environment variable.

TopicDetails
Multimodal Serialization Implements the conversion of LangChain's multimodal message formats (e.g., image_url, audio_url) into Galileo's TextContentBlock and DataContentBlock schemas. This involves updating the EventSerializer to process list-based content in AIMessage and BaseMessage subclasses as structured content blocks, ensuring proper serialization for various message types and their streaming chunks.
Modified files (4)
  • src/galileo/utils/serialization.py
  • tests/test_langchain.py
  • tests/test_langchain_async.py
  • tests/utils/test_serialization.py
Latest Contributors(2)
UserCommitDate
Focadecombatefeat-add-proto-plus-me...February 25, 2026
fernando.correia@galil...fix-handlers-pass-inge...February 12, 2026
Direct Trace Ingestion Introduces a new IngestTraces client for direct communication with the orbit ingest service, bypassing the main API for trace submission. This includes defining a new /ingest/traces/{project_id} route, integrating the client into the GalileoLogger for conditional use based on the GALILEO_INGEST_URL environment variable, and updating project dependencies to reflect the required galileo-core schema changes.
Modified files (5)
  • poetry.lock
  • pyproject.toml
  • src/galileo/constants/routes.py
  • src/galileo/logger/logger.py
  • src/galileo/traces.py
Latest Contributors(2)
UserCommitDate
ci@rungalileo.iochore-release-v1.48.0February 25, 2026
Focadecombatefeat-support-crewai-1....February 25, 2026
This pull request is reviewed by Baz. Review like a pro on (Baz).

@pratduv pratduv requested a review from a team as a code owner February 25, 2026 23:21
@pratduv pratduv requested review from dmcwhorter and removed request for dmcwhorter February 25, 2026 23:21
@pratduv pratduv force-pushed the feature/sc-56110/multimodal-content-blocks-langchain branch from 5617134 to aefd885 Compare February 25, 2026 23:32
Comment on lines +223 to +227
@async_warn_catch_exception(logger=_logger)
async def ingest_traces(self, traces_ingest_request: TracesIngestRequest) -> dict[str, Any]:
if self.experiment_id:
traces_ingest_request.experiment_id = UUID(self.experiment_id)
elif self.log_stream_id:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IngestTraces.ingest_traces now reimplements the experiment/log_stream wiring (and by extension the model_dump/_log_ingest_content_blocks flow) that already exists in Traces.ingest_traces; keeping two copies means every future change to request preparation or logging must be applied twice. Can we extract a shared helper that sets experiment_id/log_stream_id, dumps the request, logs the content blocks, and optionally sets logging_method, then call it from both ingestion clients so we only maintain that plumbing in one place?

Finding type: Code Dedup and Conventions


  • Apply fix with Baz

elif self.experiment_id:
self._traces_client = Traces(project_id=self.project_id, experiment_id=self.experiment_id)

if os.environ.get("GALILEO_INGEST_URL"):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only created when this URL is set

Convert LangChain's multimodal message format (image_url, audio_url, etc.)
into Galileo's IngestContentBlock schema (TextContentBlock, DataContentBlock)
instead of flattening list content to a plain string.

Adds IngestTraces client for the orbit ingest service (opt-in via
GALILEO_INGEST_URL env var) and debug logging at serialization boundaries.

Made-with: Cursor
@pratduv pratduv force-pushed the feature/sc-56110/multimodal-content-blocks-langchain branch from 1d6a39e to 30195c9 Compare February 26, 2026 15:43
@pratduv
Copy link
Contributor Author

pratduv commented Feb 26, 2026

"""
Multimodal content ingestion via LangChain + Galileo callback.

Invokes the model with REAL calls for every LangChain message type that can
carry multimodal content, then flushes to the ingest service. A successful
flush (no errors) proves the Go ingest service can parse every content-block
variant.

Tests:
  1. Plain text messages
  2. HumanMessage with image_url
  3. HumanMessage with base64 image
  4. Streaming with multimodal input
  5. ToolMessage with multimodal content in conversation history
  6. AIMessage with list content in conversation history

Expects these env vars in ../.env (or the logstreams/.env):
    GALILEO_CONSOLE_URL, GALILEO_API_KEY, GALILEO_PROJECT, GALILEO_LOG_STREAM, OPENAI_API_KEY

Optional:
    GALILEO_INGEST_URL  - if set, uses the orbit ingest service instead of v2 API

Usage:
    cd nbs/examples/py_files/logstreams
    python langchain_multimodal_example.py
"""

import logging
from pathlib import Path

from dotenv import load_dotenv

load_dotenv(Path(__file__).resolve().parents[3] / ".env")

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s [%(levelname)s] %(message)s")

import contextlib  # noqa: E402

from langchain_core.messages import (  # noqa: E402
    AIMessage,
    HumanMessage,
    SystemMessage,
    ToolMessage,
)
from langchain_openai import ChatOpenAI  # noqa: E402

from galileo import galileo_context  # noqa: E402
from galileo.handlers.langchain import GalileoCallback  # noqa: E402

SAMPLE_IMAGE_URL = (
    "https://upload.wikimedia.org/wikipedia/commons/thumb/4/47/"
    "PNG_transparency_demonstration_1.png/280px-PNG_transparency_demonstration_1.png"
)

RED_SQUARE_B64 = (
    "data:image/png;base64,"
    "iVBORw0KGgoAAAANSUhEUgAAAGQAAABkCAIAAAD/gAIDAAABFUlEQVR4nO3OUQkAIABEsetfWiv4"
    "Nx4IC7Cd7XvkByF+EOIHIX4Q4gchfhDiByF+EOIHIX4Q4gchfhDiByF+EOIHIX4Q4gchfhDiByF+"
    "EOIHIX4Q4gchfhDiByF+EOIHIX4Q4gchfhDiByF+EOIHIX4Q4gchfhDiByF+EOIHIX4Q4gchfhDi"
    "ByF+EOIHIX4Q4gchfhDiByF+EOIHIX4Q4gchfhDiByF+EOIHIX4Q4gchfhDiByF+EOIHIX4Q4gch"
    "fhDiByF+EOIHIX4Q4gchfhDiByF+EOIHIX4Q4gchfhDiByF+EOIHIX4Q4gchfhDiByF+EOIHIX4Q"
    "4gchfhDiByF+EOIHIX4Q4gchfhDiByF+EOIHIReeLesrH9s1agAAAABJRU5ErkJggg=="
)


def run_llm_tests() -> None:
    """Run live LLM calls through the Galileo callback, covering all message types."""
    galileo_context.init()

    callback = GalileoCallback()
    model = ChatOpenAI(model="gpt-4o", temperature=0.0)

    # --- Test 1: Plain text ---
    print("\n--- Test 1: Text-only ---")  # noqa: T201
    resp = model.invoke(
        [SystemMessage(content="You are a helpful assistant. Be concise."), HumanMessage(content="What is 2 + 2?")],
        config={"callbacks": [callback]},
    )
    print(f"Response: {resp.content}")  # noqa: T201

    # --- Test 2: HumanMessage with image URL ---
    print("\n--- Test 2: HumanMessage + image URL ---")  # noqa: T201
    resp = model.invoke(
        [
            SystemMessage(content="You are a helpful assistant that can analyze images. Be concise."),
            HumanMessage(
                content=[
                    {"type": "text", "text": "What do you see in this image? Describe it briefly."},
                    {"type": "image_url", "image_url": {"url": SAMPLE_IMAGE_URL}},
                ]
            ),
        ],
        config={"callbacks": [callback]},
    )
    print(f"Response: {resp.content}")  # noqa: T201

    # --- Test 3: HumanMessage with base64 image ---
    print("\n--- Test 3: HumanMessage + base64 image ---")  # noqa: T201
    with contextlib.suppress(Exception):
        resp = model.invoke(
            [
                SystemMessage(content="You are a helpful assistant. Be concise."),
                HumanMessage(
                    content=[
                        {"type": "text", "text": "What color is this image?"},
                        {"type": "image_url", "image_url": {"url": RED_SQUARE_B64}},
                    ]
                ),
            ],
            config={"callbacks": [callback]},
        )
        print(f"Response: {resp.content}")  # noqa: T201

    # --- Test 4: Streaming with multimodal input ---
    print("\n--- Test 4: Streaming multimodal ---")  # noqa: T201
    chunks = []
    for chunk in model.stream(
        [
            SystemMessage(content="You are a helpful assistant that can analyze images. Be concise."),
            HumanMessage(
                content=[
                    {"type": "text", "text": "Describe this image in one sentence."},
                    {"type": "image_url", "image_url": {"url": SAMPLE_IMAGE_URL}},
                ]
            ),
        ],
        config={"callbacks": [callback]},
    ):
        chunks.append(chunk)
    print(f"Streaming: {len(chunks)} chunks received")  # noqa: T201

    # --- Test 5: ToolMessage with multimodal content in conversation history ---
    # LangChain passes the entire conversation via on_chat_model_start, which
    # serializes every message through EventSerializer. Placing a ToolMessage
    # with list[dict] content in history tests that code path end-to-end.
    print("\n--- Test 5: ToolMessage (multimodal) in history ---")  # noqa: T201
    resp = model.invoke(
        [
            SystemMessage(content="You are a helpful assistant. Be concise."),
            HumanMessage(content="Generate a chart of our sales data."),
            AIMessage(
                content="",
                tool_calls=[{"id": "call_chart", "name": "generate_chart", "args": {"data": "sales"}}],
            ),
            ToolMessage(
                content=[
                    {"type": "text", "text": "Chart generated successfully:"},
                    {"type": "image_url", "image_url": {"url": SAMPLE_IMAGE_URL}},
                ],
                tool_call_id="call_chart",
            ),
            HumanMessage(content="Summarize what you see in the chart you generated."),
        ],
        config={"callbacks": [callback]},
    )
    print(f"Response: {resp.content}")  # noqa: T201

    # --- Test 6: AIMessage with list content in conversation history ---
    # Tests serialization of AIMessage where .content is a list of dicts
    # (e.g., from a model that returns structured multimodal output).
    print("\n--- Test 6: AIMessage (list content) in history ---")  # noqa: T201
    resp = model.invoke(
        [
            SystemMessage(content="You are a helpful assistant. Be concise."),
            HumanMessage(
                content=[
                    {"type": "text", "text": "What is in this image?"},
                    {"type": "image_url", "image_url": {"url": SAMPLE_IMAGE_URL}},
                ]
            ),
            AIMessage(
                content=[
                    {"type": "text", "text": "The image shows overlapping colored dice on a transparent background."},
                ]
            ),
            HumanMessage(content="Are the dice all the same color?"),
        ],
        config={"callbacks": [callback]},
    )
    print(f"Response: {resp.content}")  # noqa: T201

    # --- Flush to Galileo ---
    print("\n--- Flushing to Galileo ---")  # noqa: T201
    galileo_context.flush()
    print("Flush complete.")  # noqa: T201


def main() -> None:
    run_llm_tests()


if __name__ == "__main__":
    main()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant