Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions python/example/task/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Example usage of the Render Tasks Python SDK."""

import asyncio
import logging

from render_sdk.workflows import Options, Retry, start, task
Expand Down Expand Up @@ -47,6 +48,14 @@ def greet(name: str) -> str:
return f"Hello, {name}!"


@task
async def fan_out(n: int) -> list[int]:
"""Fan out a number into a list of numbers."""
squares = [square(i) for i in range(n)]
results = await asyncio.gather(*squares)
return results


if __name__ == "__main__":
logger.info("Starting Render Tasks example")
try:
Expand Down
39 changes: 24 additions & 15 deletions python/render_sdk/workflows/task.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Task decorator and related functionality."""

import asyncio
import contextvars
import functools
from abc import ABC, abstractmethod
Expand Down Expand Up @@ -104,28 +105,19 @@ def get_function(self, name: str) -> Callable | None:
return task_info.func


class TaskCallable:
"""A callable that can be awaited to run as a subtask."""
class TaskInstance:
"""Represents a single task execution that can be awaited."""

def __init__(self, func, name):
self._func = func
def __init__(self, name: str, future: asyncio.Task):
self._name = name
# Copy function attributes for introspection
functools.update_wrapper(self, func)

def __call__(self, *args, **kwargs):
# Store args for potential await
self._args = args
self._kwargs = kwargs
return self
self._future = future

def __await__(self):
"""Run as a subtask when awaited."""
"""Await the task execution."""

async def run_subtask():
try:
client = _current_client.get()
return await client.run_subtask(self._name, list(self._args))
return await self._future
except LookupError as e:
raise RuntimeError(
f"Cannot run {self._name} as subtask \
Expand All @@ -135,6 +127,23 @@ async def run_subtask():
return run_subtask().__await__()


class TaskCallable:
"""A callable that can be awaited to run as a subtask."""

def __init__(self, func, name):
self._func = func
self._name = name
# Copy function attributes for introspection
functools.update_wrapper(self, func)

def __call__(self, *args, **kwargs):
# Create a new TaskInstance for each call
client = _current_client.get()
# Start execution immediately
future = asyncio.create_task(client.run_subtask(self._name, list(args)))
return TaskInstance(self._name, future)


def create_task_decorator(registry: TaskRegistry) -> Callable:
"""
Create a task decorator bound to a specific registry.
Expand Down