Skip to content

Conversation

@kiranandcode
Copy link
Contributor

@kiranandcode kiranandcode commented Nov 7, 2025

On the ongoing discussion regarding async support for effectful (#388), in particular for LLM-based interfaces, this PR implements a minor modification that allows async coroutines to flow through the usual effectful boilerplate. To an end client, the effectful API would end up looking as follows:

@defop
def add(x: int, y: int) -> int: raise NotHandled

@defop
def multiply(x: int, y: int) -> int: raise NotHandled

@defop
async def async_fetch_number(key: str) -> int: raise NotHandled

@defop
async def async_square(x: int) -> int: raise NotHandled

def client_sync_code():
    return multiply(add(1,2), 3)

async def client_code():
    a = await async_fetch_number('a')
    b = await async_fetch_number('b')
    doubled = multiply(add(a, b), 2)
    c = await async_fetch_number('c')
    c_squared = await async_square(c)
    result = add(doubled, c_squared)
    return result

async def fetch_handler(key: str) -> int: await asyncio.sleep(0.1); return int(ord(key))
async def square_handler(x: int) -> int: await asyncio.sleep(0.05); return x * x

interpretation = {
    async_fetch_number: fetch_handler, async_square: square_handler,
    add: (lambda x, y: x + y), multiply: (lambda x, y: x * y)
}


async def main():
    # without interpretation
    term = await client_code()
    term_sync = client_sync_code()
    print(f'term is {term}')
    print(f'term_sync is {term_sync}')
    # with interpretation
    with handler(interpretation):
        result = await client_code()
        result_sync = client_sync_code()
        print(f'result is {result}')
        print(f'result_sync is {result_sync}')

    # with interpretation
    with handler(interpretation):
        result = await evaluate(term)
        result_sync = evaluate(term_sync)
        print(f'evaluated result is {result}')
        print(f'evaluated result_sync is {result_sync}')
asyncio.run(main())
# term is add(multiply(add(async_fetch_number(a), async_fetch_number(b)), 2), async_square(async_fetch_number(c)))
# term_sync is multiply(add(1, 2), 3)
# result is 10191
# result_sync is 9
# evaluated result is 10191
# evaluated result_sync is 9

Pros:

  • async code just works
  • sync code is unchanged
  • you can mix sync and async code without thinking about it

Cons:

  • the deferred nature of async coroutines and how that interacts with the global runtime handler might be an easy source of footguns.

@kiranandcode kiranandcode added this to the LLM Infrastructure milestone Nov 7, 2025
@kiranandcode
Copy link
Contributor Author

async def main():
    with handler({foo: foo_handler}):
        t = foo(bar(8))
        # t is a term
    print(t)
    with handler({foo: foo_handler2, bar: bar_handler}):
        res = evaluate(t)
    print(res)


    with handler({foo: foo_handler}):
        t = foo(await bar_async(8))
        # t is a term
    print(t)
    with handler({foo: foo_handler2, bar_async: bar_async_handler}):
        res = await evaluate(t)
    print(res)

Behaves the same.

foo(bar(8))
20
foo(bar_async(8))
20

@kiranandcode
Copy link
Contributor Author

Add tests interleaving coroutine forcing and handler installation

@kiranandcode
Copy link
Contributor Author

kiranandcode commented Nov 7, 2025

async def main():

    with handler({foo: foo_handler}):
        t = foo(await bar_async(8))
        # t is a term
    print(t)
    with handler({foo: foo_handler2, bar_async: bar_async_handler}):
        res_async = evaluate(t)
    with handler({foo: foo_handler3 }):
         res = await res_async
    print(res)

@kiranandcode
Copy link
Contributor Author

kiranandcode commented Nov 7, 2025

Points in the design space to flesh out:

  • typeof
  • interaction with apply

await as an explicit term.

kwargs_evaluated = {k: evaluate(v) for k, v in expr.kwargs.items()}

# Check if any evaluated args/kwargs are coroutines
has_coro = any(inspect.iscoroutine(a) for a in args_evaluated) or any(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to push this second traversal into a branch of the first traversal in evaluate? I think the data structure traversal logic in the rest of evaluate is necessary for this to be correct in general (what if you have a list of coroutines in an argument?), so we either have to duplicate it or reuse it.



@defop
def await_[**P, T](op: Operation[P, T], *args: P.args, **kwargs: P.kwargs) -> T:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something I still think is confusing here is the type of async Operations. I'd expect await_ to take Operations that return coroutines and unbox them to return a value:

def await_[**P, T](
    op: Operation[P, Coroutine[T]], *args, **kwargs
) -> T:
    ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the correct signature or maybe we should have AsyncOperation.

If we go for the first option, then morally defop will have to modify the signature and wrap the return type in a coroutine (not sure if there's anything that specifically depends on defop maintaining the type but design wise it should be doing this)

if we go for the second option, then this might require more changes throughout the codebase if code relies on receiving an operation, and now we'd need to make it support either (probably no runtime changes, but design wise the signatures might need to be modified). AsyncOperation might be a subtype of Operation which might cut down on this.

try:
from effectful.ops.semantics import await_

if self._is_async:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to encapsulate this logic in a separate Operation or _BaseOperation subtype _AsyncOperation instead of carrying this flag around.

# Return the term representation
return typing.cast(
Callable[Concatenate[Operation[Q, V], Q], Expr[V]], defdata
)(await_, self, *args, **kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is the only place where await_ is introduced, but I'm not sure if that's right? That may need to happen in Operation.__call__ instead. We should clarify the invariants we're trying to maintain around await_ and async Operations.

@kiranandcode
Copy link
Contributor Author

kiranandcode commented Nov 12, 2025

  • BaseAsyncOperation (CallableTerm)
  • @defop await()
  • smart constructor for modify signature to wrap in coroutine
t = [add(1, await async_add(1,2))] : T
# add(1, await_(async_add, 1, 2))

with handler({apply: ...}):
    res = evaluate(t) # 4

with handler({async_add: ...}):
    res = await evaluate(t) # 4
  • changing types is out of bounds
  • __await__
  • evaluate()
def foo():
   yield 1
   return

def foo_eval():
   eval("yield 1")
   return 

is there some way of evaluating an await in the context of the parent caller.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants