Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions dev/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Development Tools for Processors

## Processor Trace

Processor traces allow you to record the inputs, outputs, and internal steps of
a processor during its execution. This is useful for debugging, analysis, and
understanding processor behavior.

A trace is a timeline of events, where each event represents an input part, an
output part, or a call to a sub-processor. Events are time stamped and ordered
chronologically. If a processor calls other processors, sub-traces are created
and nested within the main trace, providing a hierarchical view of execution.

### Enabling Tracing

To enable trace collection for a processor, use a `Trace` context manager. We
provide examples here with the `SyncFileTrace` context manager implementation.
Other approaches could be implemented in the future (e.g. stored in a DB or
streaming into a file instead for writing when the trace is done).

```python
import asyncio
from genai_processors import processor
from genai_processors.dev import trace_file

@processor.processor_function
async def my_processor_fn(content):
...

async def main():
trace_dir = '/path/to/your/trace/directory'
# Any processor call within this context will be traced.
# Change `trace_file.SyncFileTrace` with other tracing implementation if
# needed.
async with trace_file.SyncFileTrace(trace_dir):
await processor.apply_async(my_processor_fn, parts)
```

### Default implementation: write to files

The default implementation of tracing is done with `trace_file.SyncFileTrace`.
When a processor is called within a `SyncFileTrace`, it records its execution
and saves it into two files under `trace_dir` provided to the trace scope:

- `{processor_name}_{trace_id}.json` containing a json dictionary that can be
loaded for further programmatic analysis using `SyncFileTrace.load`:

```python
import os
from genai_processors.dev import trace_file

trace_dir = '/path/to/your/trace/directory'
traces = []
for f in os.listdir(trace_dir):
if f.endswith('.json'):
traces.append(trace_file.SyncFileTrace.load(
os.path.join(trace_dir, f)
)
)
```

- `{processor_name}_{trace_id}.html` containing an HTML representation of the
trace that can easily be viewed on a web browser. This is the same content as
the json dictionary.

### Implementing a new tracing

To implement a custom trace sink (e.g., save to a database, stream to a network
location), you need to extend the abstract base class `trace.Trace` from
`genai_processors.dev.trace` and implement its abstract methods. Your new class
can then be used in place of `SyncFileTrace`.

You must implement the following methods:

* `async def add_input(self, part: content_api.ProcessorPart) -> None`:
Handles input parts received by the processor.
* `async def add_output(self, part: content_api.ProcessorPart) -> None`:
Handles output parts produced by the processor.
* `async def add_sub_trace(self) -> Trace`:
Handles the start of a nested processor call. The returned `trace` should be an
instance of your custom trace implementation.
* `async def _finalize(self) -> None:`: Called when the trace context is
exited. Use this to perform final actions like flushing buffers, closing
connections, or writing data to disk.

**Asynchronous Design**

All event-handling methods (`add_input`, `add_output`, `add_sub_trace`) and
`_finalize` are `async`. This design prevents tracing from blocking the
processor's execution thread, which is critical in an asynchronous framework.
If your tracing implementation needs to perform I/O (e.g., writing to a remote
database or file system), you can use `await` for these operations without
blocking the processor.
15 changes: 15 additions & 0 deletions dev/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 2026 DeepMind Technologies Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Dev-only tools and features for genai processors."""
166 changes: 166 additions & 0 deletions dev/trace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
"""Abstract class of a trace to collect, work with and display processor traces.

A GenAIprocessor trace is a timeline of input and output events that
were used in a GenAI processor. It includes the user input and potentially the
audio and/or video stream in case of a realtime processor. The trace also
includes the function calls and responses made by the processor. Finally, it
includes the model output parts and any other arbitrary parts produced by the
processor. An event can also be a trace itself if a processor calls another one
internally.

A trace corresponds to a single processor call. If the processor is called
multiple times, multiple traces will be produced, each containing the input
used to call the processor and the output produced by the call.

__WARNING__: This is an incubating feature. The trace format is subject to
changes and we do not guarantee backward compatibility at this stage.
"""

from __future__ import annotations

import abc
import contextlib
import contextvars
import datetime
from typing import Any

from absl import logging
from genai_processors import content_api
import pydantic
import shortuuid


pydantic_converter = pydantic.TypeAdapter(Any)


class Trace(pydantic.BaseModel, abc.ABC):
"""A trace of a processor call.

A trace contains some information about when the processor was called and
includes methods to log input, output and sub-traces to the trace.

The finalize method must be called to finalize the trace and release any
resources.

This is up to the implementer to decide how to store the trace.

The add_sub_trace method should be used to create a new trace.
"""

model_config = {'arbitrary_types_allowed': True}

# Name of the trace.
name: str | None = None

# A description of the processor that produced this trace, i.e. arguments used
# to construct the processor.
processor_description: str | None = None

# A unique ID for the trace.
trace_id: str = pydantic.Field(default_factory=lambda: str(shortuuid.uuid()))

# Boolean indicating whether the trace has just been created. This is used to
# determine whether to create a subtrace when a processor is called or using
# the existing trace when it's just been created.
is_new: bool = False

# The timestamp when the trace was started (the processor was called).
start_time: datetime.datetime = pydantic.Field(
default_factory=datetime.datetime.now
)
# The timestamp when the trace was ended (the processor returned).
end_time: datetime.datetime | None = None

_token: contextvars.Token[Trace | None] | None = pydantic.PrivateAttr(
default=None
)

async def __aenter__(self) -> Trace:
parent_trace = CURRENT_TRACE.get()

if parent_trace:
logging.warning(
'Cannot enter a trace while another trace is already in scope: %s is'
' ignored in favor of %s',
self,
parent_trace,
)

self.is_new = True
self._token = CURRENT_TRACE.set(self)
return self

async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: Any,
) -> None:
if self._token is None:
return

self.end_time = datetime.datetime.now()
CURRENT_TRACE.reset(self._token)
await self._finalize()

@abc.abstractmethod
async def add_input(self, part: content_api.ProcessorPart) -> None:
"""Adds an input part to the trace events."""
raise NotImplementedError()

@abc.abstractmethod
async def add_output(self, part: content_api.ProcessorPart) -> None:
"""Adds an output part to the trace events."""
raise NotImplementedError()

@abc.abstractmethod
async def add_sub_trace(self, name: str) -> Trace:
"""Adds a sub-trace from a nested processor call to the trace events.

Args:
name: The name of the sub-trace.

Returns:
The trace that was added to the trace events.
"""
# TODO(elisseeff, kibergus): consider adding a more generic relationship
# between traces, e.g. traces generated one after another (wiht the + ops)
# or traces generated in parallel (with the // ops).
raise NotImplementedError()

@abc.abstractmethod
async def _finalize(self) -> None:
"""Finalize the trace.

At this stage, the trace is ready to be stored and/or displayed. It is up
to the implementer to decide how to store the trace. When this function
returns all traces should be considered finalized and stored.
"""
raise NotImplementedError()


CURRENT_TRACE: contextvars.ContextVar[Trace | None] = contextvars.ContextVar(
'current_trace', default=None
)


@contextlib.asynccontextmanager
async def call_scope(processor_name: str):
"""Context manager for tracing a processor call."""
parent_trace = CURRENT_TRACE.get()

if parent_trace is None:
# No tracing in scope - keep things as is.
yield None
elif parent_trace.is_new:
# First call to a processor - re-use the root trace. It has been created
# when the trace_scope was entered.
parent_trace.name = processor_name
parent_trace.is_new = False
yield parent_trace
else:
# Parent is not None and corresponds to an existing trace: adds a new trace.
async with await parent_trace.add_sub_trace(
name=processor_name
) as new_trace:
yield new_trace
Loading
Loading