From 81f8a2a44b6f48d3ead0f0023a59199c1d95a2cd Mon Sep 17 00:00:00 2001 From: Mark Benjamin Date: Fri, 10 Oct 2025 13:28:38 -0400 Subject: [PATCH 1/2] Use different instances (cherry picked from commit 4248ce8a0d2951c48c15a1d38caa3549df7feef7) --- python/render_sdk/workflows/task.py | 39 ++++++++++++++++++----------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/python/render_sdk/workflows/task.py b/python/render_sdk/workflows/task.py index f883965..7ac0865 100644 --- a/python/render_sdk/workflows/task.py +++ b/python/render_sdk/workflows/task.py @@ -1,5 +1,6 @@ """Task decorator and related functionality.""" +import asyncio import contextvars import functools from abc import ABC, abstractmethod @@ -104,28 +105,19 @@ def get_function(self, name: str) -> Callable | None: return task_info.func -class TaskCallable: - """A callable that can be awaited to run as a subtask.""" +class TaskInstance: + """Represents a single task execution that can be awaited.""" - def __init__(self, func, name): - self._func = func + def __init__(self, name: str, future: asyncio.Task): self._name = name - # Copy function attributes for introspection - functools.update_wrapper(self, func) - - def __call__(self, *args, **kwargs): - # Store args for potential await - self._args = args - self._kwargs = kwargs - return self + self._future = future def __await__(self): - """Run as a subtask when awaited.""" + """Await the task execution.""" async def run_subtask(): try: - client = _current_client.get() - return await client.run_subtask(self._name, list(self._args)) + return await self._future except LookupError as e: raise RuntimeError( f"Cannot run {self._name} as subtask \ @@ -135,6 +127,23 @@ async def run_subtask(): return run_subtask().__await__() +class TaskCallable: + """A callable that can be awaited to run as a subtask.""" + + def __init__(self, func, name): + self._func = func + self._name = name + # Copy function attributes for introspection + functools.update_wrapper(self, func) + + def __call__(self, *args, **kwargs): + # Create a new TaskInstance for each call + client = _current_client.get() + # Start execution immediately + future = asyncio.create_task(client.run_subtask(self._name, list(args))) + return TaskInstance(self._name, future) + + def create_task_decorator(registry: TaskRegistry) -> Callable: """ Create a task decorator bound to a specific registry. From 784bf15af73b55a3a5fad40289435698659c8260 Mon Sep 17 00:00:00 2001 From: Mark Benjamin Date: Fri, 10 Oct 2025 12:17:51 -0400 Subject: [PATCH 2/2] Add fan out example (cherry picked from commit 422d8eabf11a5cbd454f27d0a76f632877d595ed) --- python/example/task/main.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/python/example/task/main.py b/python/example/task/main.py index 90d2c8e..e6276dd 100644 --- a/python/example/task/main.py +++ b/python/example/task/main.py @@ -1,5 +1,6 @@ """Example usage of the Render Tasks Python SDK.""" +import asyncio import logging from render_sdk.workflows import Options, Retry, start, task @@ -47,6 +48,14 @@ def greet(name: str) -> str: return f"Hello, {name}!" +@task +async def fan_out(n: int) -> list[int]: + """Fan out a number into a list of numbers.""" + squares = [square(i) for i in range(n)] + results = await asyncio.gather(*squares) + return results + + if __name__ == "__main__": logger.info("Starting Render Tasks example") try: