Skip to content

Conversation

@kiranandcode
Copy link
Contributor

@kiranandcode kiranandcode commented Nov 17, 2025

In draft mode as it's very WIP, but following discussions with @jfeser, this branch proposes an alternative implementation for async support for effectful through concurrent.futures.

  • main design complexities (so far):
  • for thread safety, we need to make the interpretation thread local, but new threads do not inherit the value of thread local variables, they see fresh instances, so we need to save and restore the runtime context at the time of submission inside the submitted task.
from concurrent.futures import Future, ThreadPoolExecutor

from effectful.handlers.futures import ThreadPoolFuturesInterpretation, Executor
from effectful.ops.semantics import NotHandled, defop, handler


@defop
def add(n: int, k: int) -> int:
    raise NotHandled


@defop
def a_mul(n: int, k: int) -> Future[int]:
    raise NotHandled


def a_mul_interp(n, k):
    def mul(a: int, b: int):
        a, b = (-a, -b) if a < 0 else (a, b)
        res: int = 0
        while a > 0:
            a, res = a - 1, add(res, b)
        return res

    return Executor.submit(mul, n, k)


with handler(ThreadPoolFuturesInterpretation()), handler({a_mul: a_mul_interp}):
    fut: Future[int] = a_mul(2, 10)
    # with handler({add: lambda a,b: a + b}): # wouldn't change result
    res = fut.result(None)
    assert res == add(add(0, 10), 10)
    with handler({add: lambda a, b: a + b}):
        fut: Future[int] = a_mul(2, 10)
        assert fut.result() == 20

@kiranandcode kiranandcode changed the title Alternative proposal for async support in effectful Alternative proposal for concurrent.futures support in effectful Nov 18, 2025
Copy link
Contributor

@jfeser jfeser left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks like it's moving in good directions. The simple API is a good sign.

I don't like the look of release_handler_lock and friends. I'm surprised that nothing in the LLM provider code seems to call it, which suggests that it must be called by users of that code? I definitely don't think we want this to be part of the public API. My suggestion is to leave this extra API out until we have a clear need for it.

Comment on lines +187 to +190
@dataclass(frozen=True)
class DoneAndNotDoneFutures[T]:
done: set[Future[T]]
not_done: set[Future[T]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this defined in the concurrent.futures library? Do we need our own dataclass definition?

Copy link
Contributor Author

@kiranandcode kiranandcode Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a namedtuple in the library, I added this dataclass because @defop ... using the namedtuple type was not allowing .done iirc (or there could have been another issue that was causing this to fail and I misdiagnosed the cause)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There shouldn't be a behavioral difference in effectful between the dataclass and the namedtuple for this simple case, so it's probably worth creating a self-contained test that passes with the dataclass and fails with the namedtuple so that we can try to fix it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yep will do!

@kiranandcode
Copy link
Contributor Author

I don't like the look of release_handler_lock and friends. I'm surprised that nothing in the LLM provider code seems to call it, which suggests that it must be called by users of that code? I definitely don't think we want this to be part of the public API. My suggestion is to leave this extra API out until we have a clear need for it.

Oh, yep, that was enabled by the realisation that Executor.submit's function is evaluated in a new thread, so the code doesn't hold the lock at all. So likely release_handler_lock can probably be removed.

@kiranandcode kiranandcode added this to the LLM Infrastructure milestone Nov 18, 2025
@kiranandcode
Copy link
Contributor Author

Proposed interface is now:

if __name__ == "__main__":
    from openai import OpenAI
    from effectful.ops.syntax import implements
    from effectful.ops.semantics import fwd

    openai_provider = OpenAIAPIProvider(client=OpenAI())
    user_request: str = "write a fibonnaci function"
    def safe_synthesis_get[T](future: Future[T]) -> T | None:
        try:
            return future.result()
        except SynthesisError:
            return None

    with handler(ThreadPoolFuturesInterpretation()),  handler(openai_provider), handler(ProgramSynthesis()):
        # Launch multiple LLM calls concurrently
        tasks = [Executor.submit(generate_program, user_request + f" {i}") for i in range(3)]
        results_as_completed = (safe_synthesis_get(f) for f in futures.as_completed(tasks))
        valid_results = list(result for result in results_as_completed if result)
        best_program = next(result for result in valid_results)

    print(f"programs were {valid_results}")
    print(f"best_program was {getattr(best_program, '__src__', None)}")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants