diff --git a/python/example/task/main.py b/python/example/task/main.py index 90d2c8e..e6276dd 100644 --- a/python/example/task/main.py +++ b/python/example/task/main.py @@ -1,5 +1,6 @@ """Example usage of the Render Tasks Python SDK.""" +import asyncio import logging from render_sdk.workflows import Options, Retry, start, task @@ -47,6 +48,14 @@ def greet(name: str) -> str: return f"Hello, {name}!" +@task +async def fan_out(n: int) -> list[int]: + """Fan out a number into a list of numbers.""" + squares = [square(i) for i in range(n)] + results = await asyncio.gather(*squares) + return results + + if __name__ == "__main__": logger.info("Starting Render Tasks example") try: diff --git a/python/render_sdk/workflows/task.py b/python/render_sdk/workflows/task.py index f883965..7ac0865 100644 --- a/python/render_sdk/workflows/task.py +++ b/python/render_sdk/workflows/task.py @@ -1,5 +1,6 @@ """Task decorator and related functionality.""" +import asyncio import contextvars import functools from abc import ABC, abstractmethod @@ -104,28 +105,19 @@ def get_function(self, name: str) -> Callable | None: return task_info.func -class TaskCallable: - """A callable that can be awaited to run as a subtask.""" +class TaskInstance: + """Represents a single task execution that can be awaited.""" - def __init__(self, func, name): - self._func = func + def __init__(self, name: str, future: asyncio.Task): self._name = name - # Copy function attributes for introspection - functools.update_wrapper(self, func) - - def __call__(self, *args, **kwargs): - # Store args for potential await - self._args = args - self._kwargs = kwargs - return self + self._future = future def __await__(self): - """Run as a subtask when awaited.""" + """Await the task execution.""" async def run_subtask(): try: - client = _current_client.get() - return await client.run_subtask(self._name, list(self._args)) + return await self._future except LookupError as e: raise RuntimeError( f"Cannot run {self._name} as subtask \ @@ -135,6 +127,23 @@ async def run_subtask(): return run_subtask().__await__() +class TaskCallable: + """A callable that can be awaited to run as a subtask.""" + + def __init__(self, func, name): + self._func = func + self._name = name + # Copy function attributes for introspection + functools.update_wrapper(self, func) + + def __call__(self, *args, **kwargs): + # Create a new TaskInstance for each call + client = _current_client.get() + # Start execution immediately + future = asyncio.create_task(client.run_subtask(self._name, list(args))) + return TaskInstance(self._name, future) + + def create_task_decorator(registry: TaskRegistry) -> Callable: """ Create a task decorator bound to a specific registry.