From 6fde9099b93bc2d18d991a52efbb9058be903c49 Mon Sep 17 00:00:00 2001 From: Kiran Gopinathan Date: Mon, 17 Nov 2025 18:41:01 -0500 Subject: [PATCH 1/9] initial commit --- effectful/handlers/futures/__init__.py | 273 +++++++++++++++++++++++++ effectful/internals/runtime.py | 13 +- effectful/ops/syntax.py | 65 ++++++ 3 files changed, 347 insertions(+), 4 deletions(-) create mode 100644 effectful/handlers/futures/__init__.py diff --git a/effectful/handlers/futures/__init__.py b/effectful/handlers/futures/__init__.py new file mode 100644 index 00000000..1f231e26 --- /dev/null +++ b/effectful/handlers/futures/__init__.py @@ -0,0 +1,273 @@ +""" +Futures handler for effectful - provides integration with concurrent.futures. + +This module provides operations for working with concurrent.futures, allowing +effectful operations to be executed asynchronously in thread pools with +automatic preservation of interpretation context. +""" + +import concurrent.futures as futures +import functools +from collections.abc import Callable, Iterable +from concurrent.futures import Executor as FuturesExecutor +from concurrent.futures import Future, ProcessPoolExecutor, ThreadPoolExecutor +from dataclasses import dataclass +from typing import Literal + +from effectful.ops.semantics import defop +from effectful.ops.syntax import ObjectInterpretation, defdata, implements +from effectful.ops.types import NotHandled, Term + + +class Executor: + """Namespace for executor-related operations.""" + + @staticmethod + @defop # type: ignore + def submit[**P, T]( + task: Callable[P, T], *args: P.args, **kwargs: P.kwargs + ) -> Future[T]: + """ + Submit a task for asynchronous execution. + + This operation should be handled by providing a FuturesInterpretation + which automatically preserves the interpretation context across thread boundaries. + + :param task: The callable to execute asynchronously + :param args: Positional arguments for the task + :param kwargs: Keyword arguments for the task + :return: A Future representing the asynchronous computation + + Example: + >>> from concurrent.futures import ThreadPoolExecutor + >>> from effectful.handlers.futures import ThreadPoolFuturesInterpretation + >>> from effectful.ops.semantics import handler + >>> + >>> pool = ThreadPoolExecutor() + >>> with handler(ThreadPoolFuturesInterpretation(pool)): + >>> future = Executor.submit(my_function, arg1, arg2) + """ + raise NotHandled + + @staticmethod + @defop + def map[T, R]( + func: Callable[[T], R], + *iterables: Iterable[T], + timeout: float | None = None, + chunksize: int = 1, + ) -> Iterable[R]: + """ + Map a function over iterables, executing asynchronously. + + Returns an iterator yielding results as they complete. Equivalent to + map(func, *iterables) but executes asynchronously. + + This operation should be handled by providing a FuturesInterpretation + which automatically preserves the interpretation context across thread boundaries. + + :param func: The function to map over the iterables + :param iterables: One or more iterables to map over + :param timeout: Maximum time to wait for a result (default: None) + :param chunksize: Size of chunks for ProcessPoolExecutor (default: 1) + :return: An iterator yielding results + + Example: + >>> from effectful.handlers.futures import ThreadPoolFuturesInterpretation + >>> from effectful.ops.semantics import handler + >>> + >>> def square(x): + >>> return x ** 2 + >>> + >>> with handler(ThreadPoolFuturesInterpretation()): + >>> results = list(Executor.map(square, range(10))) + >>> print(results) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] + """ + raise NotHandled + + +class FuturesInterpretation(ObjectInterpretation): + """ + Base interpretation for concurrent.futures executors. + + This interpretation automatically preserves the effectful interpretation context + when submitting tasks to worker threads, ensuring that effectful operations + work correctly across thread boundaries. + """ + + def __init__(self, executor): + """ + Initialize the futures interpretation. + + :param executor: The executor to use (ThreadPoolExecutor or ProcessPoolExecutor) + """ + super().__init__() + self.executor: FuturesExecutor = executor + + def shutdown(self, *args, **kwargs): + self.executor.shutdown(*args, **kwargs) + + @implements(Executor.submit) + def submit(self, task: Callable, *args, **kwargs) -> Future: + """ + Submit a task to the executor with automatic context preservation. + + Captures the current interpretation context and ensures it is restored + in the worker thread before executing the task. + """ + from effectful.internals.runtime import get_interpretation, interpreter + + # Capture the current interpretation context + context = get_interpretation() + + @functools.wraps(task) + def wrapped_task(*task_args, **task_kwargs): + # Restore the interpretation context in the worker thread + with interpreter(context): + return task(*task_args, **task_kwargs) + + # Submit the wrapped task to the underlying executor + return self.executor.submit(wrapped_task, *args, **kwargs) + + @implements(Executor.map) + def map(self, func: Callable, *iterables, timeout=None, chunksize=1): + """ + Map a function over iterables with automatic context preservation. + + Captures the current interpretation context and ensures it is restored + in each worker thread before executing the function. + """ + from effectful.internals.runtime import get_interpretation, interpreter + + # Capture the current interpretation context + context = get_interpretation() + + @functools.wraps(func) + def wrapped_func(*args, **kwargs): + # Restore the interpretation context in the worker thread + with interpreter(context): + return func(*args, **kwargs) + + # Call the executor's map with the wrapped function + return self.executor.map( + wrapped_func, *iterables, timeout=timeout, chunksize=chunksize + ) + + +class ThreadPoolFuturesInterpretation(FuturesInterpretation): + """ + Interpretation for ThreadPoolExecutor with automatic context preservation. + + Example: + >>> from concurrent.futures import ThreadPoolExecutor, Future + >>> from effectful.ops.semantics import defop, handler + >>> from effectful.handlers.futures import Executor, ThreadPoolFuturesInterpretation + >>> + >>> @defop + >>> def async_pow(n: int, k: int) -> Future[int]: + >>> return Executor.submit(pow, n, k) + >>> + >>> pool = ThreadPoolExecutor() + >>> with handler(ThreadPoolFuturesInterpretation(pool)): + >>> result = async_pow(2, 10).result() + >>> print(result) # 1024 + """ + + def __init__(self, max_workers=None): + """ + Initialize with a ThreadPoolExecutor. + + :param max_workers: Maximum number of worker threads (default: None, uses default from ThreadPoolExecutor) + """ + super().__init__(ThreadPoolExecutor(max_workers=max_workers)) + + +class ProcessPoolFuturesInterpretation(FuturesInterpretation): + """ + Interpretation for ProcessPoolExecutor with automatic context preservation. + + Note: Context preservation across processes has limitations due to pickling constraints. + """ + + def __init__(self, max_workers=None): + """ + Initialize with a ProcessPoolExecutor. + + :param max_workers: Maximum number of worker processes (default: None, uses default from ProcessPoolExecutor) + """ + super().__init__(ProcessPoolExecutor(max_workers=max_workers)) + + +type ReturnOptions = Literal["All_COMPLETED", "FIRST_COMPLETED", "FIRST_EXCEPTION"] + + +@dataclass(frozen=True) +class DoneAndNotDoneFutures[T]: + done: set[Future[T]] + not_done: set[Future[T]] + + +@defdata.register(DoneAndNotDoneFutures) +class _DoneAndNotDoneFuturesTerm[T](Term[DoneAndNotDoneFutures[T]]): + """Term representing a DoneAndNotDoneFutures result.""" + + def __init__(self, op, *args, **kwargs): + self._op = op + self._args = args + self._kwargs = kwargs + + @property + def op(self): + return self._op + + @property + def args(self): + return self._args + + @property + def kwargs(self): + return self._kwargs + + @defop # type: ignore[prop-decorator] + @property + def done(self) -> set[Future[T]]: + """Get the set of done futures.""" + if not isinstance(self, Term): + return self.done # type: ignore + else: + raise NotHandled + + @defop # type: ignore[prop-decorator] + @property + def not_done(self) -> set[Future[T]]: + """Get the set of not done futures.""" + if not isinstance(self, Term): + return self.not_done # type: ignore + else: + raise NotHandled + + +@defop +def wait[T]( + fs: Iterable[Future[T]], + timeout: int | None = None, + return_when: ReturnOptions = futures.ALL_COMPLETED, # type: ignore +) -> DoneAndNotDoneFutures[T]: + if ( + isinstance(timeout, Term) + or isinstance(return_when, Term) + or any(not isinstance(t, Future) for t in fs) + ): + raise NotHandled + print(f"wait called with {fs} {timeout} {return_when}") + return futures.wait(fs, timeout, return_when) # type: ignore + + +@defop +def as_completed[T]( + fs: Iterable[Future[T]], + timeout: int | None = None, +) -> Iterable[Future[T]]: + if isinstance(timeout, Term) or any(isinstance(t, Term) for t in fs): + raise NotHandled + return futures.as_completed(fs, timeout) diff --git a/effectful/internals/runtime.py b/effectful/internals/runtime.py index f99472fe..a954f36d 100644 --- a/effectful/internals/runtime.py +++ b/effectful/internals/runtime.py @@ -1,20 +1,25 @@ import contextlib -import dataclasses import functools +import threading from collections.abc import Callable, Mapping from effectful.ops.syntax import defop from effectful.ops.types import Interpretation, Operation -@dataclasses.dataclass -class Runtime[S, T]: +class Runtime[S, T](threading.local): + """Thread-local runtime for effectful interpretations.""" + interpretation: "Interpretation[S, T]" + def __init__(self): + super().__init__() + self.interpretation = {} + @functools.lru_cache(maxsize=1) def get_runtime() -> Runtime: - return Runtime(interpretation={}) + return Runtime() def get_interpretation(): diff --git a/effectful/ops/syntax.py b/effectful/ops/syntax.py index fc2d753c..62a08c84 100644 --- a/effectful/ops/syntax.py +++ b/effectful/ops/syntax.py @@ -9,6 +9,7 @@ import typing import warnings from collections.abc import Callable, Iterable, Mapping +from concurrent.futures import Future from typing import Annotated, Any, Concatenate from effectful.ops.types import Annotation, Expr, NotHandled, Operation, Term @@ -1673,3 +1674,67 @@ class _IntegralTerm[T: numbers.Integral](_RationalTerm[T]): @defdata.register(bool) class _BoolTerm[T: bool](_IntegralTerm[T]): # type: ignore pass + + +# Future support +@defdata.register(Future) +class _FutureTerm[T](_BaseTerm[Future[T]]): + """Term representing a Future computation.""" + + @defop + def result(self: Future[T], timeout: float | None = None) -> T: + """Get the result of the future.""" + if not isinstance(self, Term): + return self.result(timeout=timeout) + else: + raise NotHandled + + @defop + def exception( + self: Future[T], timeout: float | None = None + ) -> BaseException | None: + """Get the exception from the future, if any.""" + if not isinstance(self, Term): + return self.exception(timeout=timeout) + else: + raise NotHandled + + @defop + def cancel(self: Future[T]) -> bool: + """Attempt to cancel the future.""" + if not isinstance(self, Term): + return self.cancel() + else: + raise NotHandled + + @defop + def cancelled(self: Future[T]) -> bool: + """Check if the future was cancelled.""" + if not isinstance(self, Term): + return self.cancelled() + else: + raise NotHandled + + @defop + def done(self: Future[T]) -> bool: + """Check if the future is done.""" + if not isinstance(self, Term): + return self.done() + else: + raise NotHandled + + @defop + def running(self: Future[T]) -> bool: + """Check if the future is currently running.""" + if not isinstance(self, Term): + return self.running() + else: + raise NotHandled + + @defop + def add_done_callback(self: Future[T], fn: Callable[[Future[T]], None]) -> None: + """Add a callback to be called when the future completes.""" + if not isinstance(self, Term): + return self.add_done_callback(fn) + else: + raise NotHandled From 3dd752d6d7bbacd4cbe4646a170b865f1cc8fc96 Mon Sep 17 00:00:00 2001 From: Kiran Gopinathan Date: Tue, 18 Nov 2025 11:19:34 -0500 Subject: [PATCH 2/9] added thread safety and test --- effectful/handlers/futures/__init__.py | 9 +- effectful/internals/runtime.py | 50 ++++++ effectful/ops/semantics.py | 23 ++- tests/test_handlers_futures.py | 226 +++++++++++++++++++++++++ 4 files changed, 294 insertions(+), 14 deletions(-) create mode 100644 tests/test_handlers_futures.py diff --git a/effectful/handlers/futures/__init__.py b/effectful/handlers/futures/__init__.py index 1f231e26..98644e22 100644 --- a/effectful/handlers/futures/__init__.py +++ b/effectful/handlers/futures/__init__.py @@ -9,7 +9,6 @@ import concurrent.futures as futures import functools from collections.abc import Callable, Iterable -from concurrent.futures import Executor as FuturesExecutor from concurrent.futures import Future, ProcessPoolExecutor, ThreadPoolExecutor from dataclasses import dataclass from typing import Literal @@ -95,14 +94,14 @@ class FuturesInterpretation(ObjectInterpretation): work correctly across thread boundaries. """ - def __init__(self, executor): + def __init__(self, executor: futures.Executor): """ Initialize the futures interpretation. :param executor: The executor to use (ThreadPoolExecutor or ProcessPoolExecutor) """ super().__init__() - self.executor: FuturesExecutor = executor + self.executor: futures.Executor = executor def shutdown(self, *args, **kwargs): self.executor.shutdown(*args, **kwargs) @@ -233,7 +232,7 @@ def kwargs(self): def done(self) -> set[Future[T]]: """Get the set of done futures.""" if not isinstance(self, Term): - return self.done # type: ignore + return self.done else: raise NotHandled @@ -242,7 +241,7 @@ def done(self) -> set[Future[T]]: def not_done(self) -> set[Future[T]]: """Get the set of not done futures.""" if not isinstance(self, Term): - return self.not_done # type: ignore + return self.not_done else: raise NotHandled diff --git a/effectful/internals/runtime.py b/effectful/internals/runtime.py index a954f36d..0fac8850 100644 --- a/effectful/internals/runtime.py +++ b/effectful/internals/runtime.py @@ -6,6 +6,9 @@ from effectful.ops.syntax import defop from effectful.ops.types import Interpretation, Operation +# Global reentrant lock for mutual exclusion of handler execution +_handler_lock = threading.RLock() + class Runtime[S, T](threading.local): """Thread-local runtime for effectful interpretations.""" @@ -17,6 +20,53 @@ def __init__(self): self.interpretation = {} +def get_handler_lock() -> threading.RLock: + """Get the global handler execution lock. + + This lock ensures mutual exclusion for handler execution by default. + Use release_handler_lock() to temporarily release it for concurrent operations. + """ + return _handler_lock + + +@contextlib.contextmanager +def release_handler_lock(): + """Context manager to temporarily release the handler lock. + + Use this when performing I/O operations or other blocking calls + that should allow other threads to execute handlers concurrently. + + Example: + @defop + def llm_call(prompt: str) -> str: + with release_handler_lock(): + # HTTP request can run while other threads execute handlers + response = requests.post(url, json={"prompt": prompt}) + return response.json()["result"] + """ + lock = get_handler_lock() + lock.release() + try: + yield + finally: + lock.acquire() + + +@contextlib.contextmanager +def acquire_handler_lock(): + """Context manager to acquire the handler lock. + + This is called automatically by apply() for handler execution. + Most users won't need to call this directly. + """ + lock = get_handler_lock() + lock.acquire() + try: + yield + finally: + lock.release() + + @functools.lru_cache(maxsize=1) def get_runtime() -> Runtime: return Runtime() diff --git a/effectful/ops/semantics.py b/effectful/ops/semantics.py index cc3857a7..7b4e3143 100644 --- a/effectful/ops/semantics.py +++ b/effectful/ops/semantics.py @@ -19,6 +19,10 @@ def apply[**P, T](op: Operation[P, T], *args: P.args, **kwargs: P.kwargs) -> T: """Apply ``op`` to ``args``, ``kwargs`` in interpretation ``intp``. + Handler execution is mutually exclusive by default - only one thread can + execute handlers at a time. Use :func:`release_handler_lock` to temporarily + release the lock for I/O operations that should allow concurrent execution. + Handling :func:`apply` changes the evaluation strategy of terms. **Example usage**: @@ -45,15 +49,16 @@ def apply[**P, T](op: Operation[P, T], *args: P.args, **kwargs: P.kwargs) -> T: mul(add(1, 2), 3) """ - from effectful.internals.runtime import get_interpretation - - intp = get_interpretation() - if op in intp: - return intp[op](*args, **kwargs) - elif apply in intp: - return intp[apply](op, *args, **kwargs) - else: - return op.__default_rule__(*args, **kwargs) # type: ignore + from effectful.internals.runtime import acquire_handler_lock, get_interpretation + + with acquire_handler_lock(): + intp = get_interpretation() + if op in intp: + return intp[op](*args, **kwargs) + elif apply in intp: + return intp[apply](op, *args, **kwargs) + else: + return op.__default_rule__(*args, **kwargs) # type: ignore @defop diff --git a/tests/test_handlers_futures.py b/tests/test_handlers_futures.py new file mode 100644 index 00000000..de79fdf3 --- /dev/null +++ b/tests/test_handlers_futures.py @@ -0,0 +1,226 @@ +""" +Tests for the futures handler (effectful.handlers.futures). + +This module tests the integration of concurrent.futures with effectful, +including context preservation across thread boundaries. +""" + +import time +from concurrent.futures import Future + +import effectful.handlers.futures as futures +from effectful.handlers.futures import ( + Executor, + ThreadPoolFuturesInterpretation, +) +from effectful.internals.runtime import release_handler_lock +from effectful.ops.semantics import NotHandled, defop, evaluate, handler +from effectful.ops.types import Term + + +@defop +def add(x: int, y: int) -> int: + raise NotHandled + + +@defop +def a_mul(x: int, y: int) -> Future[int]: + raise NotHandled + + +@defop +def a_div(x: int, y: int) -> Future[int]: + raise NotHandled + + +@defop +def a_fac(n: int) -> Future[int]: + raise NotHandled + + +def test_uninterp_async(): + """calling async func without interpretation returns term""" + t = a_div(10, 20) + assert isinstance(t, Term) + + +def test_mutual_exclusion(): + """Test that handler execution is mutually exclusive by default. + + Without mutual exclusion, the race condition in add_interp would cause + add_calls to be less than 10. With mutual exclusion, we're guaranteed + to get exactly 10 calls. + """ + add_calls = 0 + + def add_interp(x: int, y: int) -> int: + nonlocal add_calls + no_calls = add_calls + time.sleep(0.001) + add_calls = no_calls + 1 + return x + y + + def client(x: int): + return add(x, x) + + with ( + handler(ThreadPoolFuturesInterpretation(max_workers=4)), + handler({add: add_interp}), + ): + _ = sum(Executor.map(client, list(range(10)))) + # With mutual exclusion, we're guaranteed to get exactly 10 + assert add_calls == 10 + + +def test_release_lock_for_concurrent_io(): + """Test that release_handler_lock allows concurrent I/O operations. + + This demonstrates the pattern for handlers that perform I/O and want + to allow other handlers to run concurrently during the I/O wait. + """ + from effectful.internals.runtime import release_handler_lock + + io_calls = 0 + concurrent_ios = 0 + max_concurrent = 0 + + @defop + def io_operation(x: int) -> int: + """Simulates a slow io operation""" + raise NotHandled + + def io_interp(x: int) -> int: + nonlocal io_calls, concurrent_ios, max_concurrent + # update state using lock + io_calls += 1 + + # release lock for IO + with release_handler_lock(): + concurrent_ios += 1 + max_concurrent = max(max_concurrent, concurrent_ios) + time.sleep(0.01) # Simulate I/O wait + concurrent_ios -= 1 + return x * 2 + + def client(x: int): + return io_operation(x) + + with ( + handler(ThreadPoolFuturesInterpretation(max_workers=4)), + handler({io_operation: io_interp}), + ): + results = list(Executor.map(client, list(range(10)))) + + assert io_calls == 10 + assert results == [x * 2 for x in range(10)] + # With release_handler_lock, multiple I/O operations can run concurrently + assert max_concurrent > 1, ( + f"Expected concurrent I/O, got max_concurrent={max_concurrent}" + ) + + +def test_wait_several_futures(): + def client_code(): + results = [] + for fut in futures.wait([a_div(3, 4), a_mul(4, 5)]).done: + results.append(fut.result()) # noqa: PERF401 + return results + + def a_div_interp(x, y): + return Executor.submit(lambda x, y: x / y, x, y) + + def a_mul_interp(x, y): + return Executor.submit(lambda x, y: x * y, x, y) + + with ( + handler(ThreadPoolFuturesInterpretation()), + handler({a_div: a_div_interp, a_mul: a_mul_interp}), + ): + assert set(client_code()) == {3 / 4, 4 * 5} + + +def test_eval_of_concurrent_terms(): + def client_code(): + # spawn two tasks in parallel + r1 = a_div(3, 4) + r2 = a_mul(3, 4) + return r1.result() + r2.result() + + def a_div_interp(x, y): + return Executor.submit(lambda x, y: x / y, x, y) + + def a_mul_interp(x, y): + return Executor.submit(lambda x, y: x * y, x, y) + + res_stx = client_code() + assert isinstance(res_stx, Term) + + with ( + handler(ThreadPoolFuturesInterpretation()), + handler({a_div: a_div_interp, a_mul: a_mul_interp}), + ): + res = client_code() + assert res == (3 / 4 + 3 * 4) + res = evaluate(res) + assert res == (3 / 4 + 3 * 4) + + +def test_context_captured_at_submission(): + def submit_work(): + return Executor.submit(lambda: add(3, 4)) + + def add_interp(x, y): + return x + y + + def add_as_mul_interp(x, y): + return x * y + + with handler(ThreadPoolFuturesInterpretation()): + with handler({add: add_interp}): + future = submit_work() + + # Retrieve result in a different context + with handler({add: add_as_mul_interp}): + result = future.result() + + # The result should be 7 (from submission context), not 12 + assert result == 7 + + # Also test retrieving result completely outside any interpretation + with ( + handler(ThreadPoolFuturesInterpretation()), + handler({add: add_interp}), + ): + future = submit_work() + + # Retrieve result outside the handler context entirely + result = future.result() + assert result == 7 + + +def test_concurrent_execution_faster_than_sequential(): + sleep_duration = 0.001 # 50ms per task + + def add_with_sleep(x, y): + # important: we must release lock here to allow concurrency + start = time.time() + with release_handler_lock(): + time.sleep(sleep_duration) + return time.time() - start + + with ( + handler(ThreadPoolFuturesInterpretation(max_workers=3)), + handler({add: add_with_sleep}), + ): + start = time.time() + + # Submit three tasks concurrently + f1 = Executor.submit(lambda: add(1, 2)) + f2 = Executor.submit(lambda: add(3, 4)) + f3 = Executor.submit(lambda: add(5, 6)) + + # Get all results + sequential_time = f1.result() + f2.result() + f3.result() + elapsed = time.time() - start + + assert elapsed < sequential_time From 42a4d8156eb606865cdbe69db8197f12143cd2c1 Mon Sep 17 00:00:00 2001 From: Kiran Gopinathan Date: Tue, 18 Nov 2025 11:58:41 -0500 Subject: [PATCH 3/9] added futures support to LLM and refined tests some more --- effectful/handlers/futures/__init__.py | 18 +------- effectful/handlers/llm/providers.py | 37 +++++++++++++--- effectful/ops/semantics.py | 19 ++++---- effectful/ops/types.py | 4 +- tests/test_handlers_futures.py | 30 +++++++++++++ tests/test_handlers_llm_futures.py | 61 ++++++++++++++++++++++++++ 6 files changed, 134 insertions(+), 35 deletions(-) create mode 100644 tests/test_handlers_llm_futures.py diff --git a/effectful/handlers/futures/__init__.py b/effectful/handlers/futures/__init__.py index 98644e22..937240d2 100644 --- a/effectful/handlers/futures/__init__.py +++ b/effectful/handlers/futures/__init__.py @@ -9,7 +9,7 @@ import concurrent.futures as futures import functools from collections.abc import Callable, Iterable -from concurrent.futures import Future, ProcessPoolExecutor, ThreadPoolExecutor +from concurrent.futures import Future, ThreadPoolExecutor from dataclasses import dataclass from typing import Literal @@ -181,22 +181,6 @@ def __init__(self, max_workers=None): super().__init__(ThreadPoolExecutor(max_workers=max_workers)) -class ProcessPoolFuturesInterpretation(FuturesInterpretation): - """ - Interpretation for ProcessPoolExecutor with automatic context preservation. - - Note: Context preservation across processes has limitations due to pickling constraints. - """ - - def __init__(self, max_workers=None): - """ - Initialize with a ProcessPoolExecutor. - - :param max_workers: Maximum number of worker processes (default: None, uses default from ProcessPoolExecutor) - """ - super().__init__(ProcessPoolExecutor(max_workers=max_workers)) - - type ReturnOptions = Literal["All_COMPLETED", "FIRST_COMPLETED", "FIRST_EXCEPTION"] diff --git a/effectful/handlers/llm/providers.py b/effectful/handlers/llm/providers.py index b0b38ed5..87b0856b 100644 --- a/effectful/handlers/llm/providers.py +++ b/effectful/handlers/llm/providers.py @@ -6,6 +6,7 @@ import string import typing from collections.abc import Hashable, Iterable, Mapping, Sequence +from concurrent.futures import Future from typing import Any, get_type_hints import pydantic @@ -22,6 +23,7 @@ from openai.types.responses import FunctionToolParam +from effectful.handlers.futures import Executor from effectful.handlers.llm import Template from effectful.ops.semantics import fwd from effectful.ops.syntax import ObjectInterpretation, defop, implements @@ -282,13 +284,13 @@ def __init__(self, client: openai.OpenAI, model_name: str = "gpt-4o"): self._client = client self._model_name = model_name - @implements(Template.__call__) - def _call[**P, T]( - self, template: Template[P, T], *args: P.args, **kwargs: P.kwargs - ) -> T: - ret_type = template.__signature__.return_annotation - bound_args = template.__signature__.bind(*args, **kwargs) - bound_args.apply_defaults() + def _openai_api_call[**P, T, RT]( + self, + template: Template[P, T], + bound_args: inspect.BoundArguments, + ret_type: type[RT], + ) -> RT: + """Execute the actual OpenAI API call and decode the response.""" prompt = _OpenAIPromptFormatter().format_as_messages( template.__prompt_template__, **bound_args.arguments ) @@ -368,3 +370,24 @@ def _call[**P, T]( result = Result.model_validate_json(result_str) assert isinstance(result, Result) return result.value # type: ignore[attr-defined] + + @implements(Template.__call__) + def _call[**P, T]( + self, template: Template[P, T], *args: P.args, **kwargs: P.kwargs + ) -> T: + ret_type = template.__signature__.return_annotation + bound_args = template.__signature__.bind(*args, **kwargs) + bound_args.apply_defaults() + + # Check if return type is Future[T] + origin = typing.get_origin(ret_type) + if origin is Future: + inner_type = typing.get_args(ret_type)[0] + return Executor.submit( + self._openai_api_call, # type: ignore + template, # type: ignore + bound_args, # type: ignore + inner_type, + ) + + return self._openai_api_call(template, bound_args, ret_type) diff --git a/effectful/ops/semantics.py b/effectful/ops/semantics.py index 7b4e3143..b99c356f 100644 --- a/effectful/ops/semantics.py +++ b/effectful/ops/semantics.py @@ -49,16 +49,15 @@ def apply[**P, T](op: Operation[P, T], *args: P.args, **kwargs: P.kwargs) -> T: mul(add(1, 2), 3) """ - from effectful.internals.runtime import acquire_handler_lock, get_interpretation - - with acquire_handler_lock(): - intp = get_interpretation() - if op in intp: - return intp[op](*args, **kwargs) - elif apply in intp: - return intp[apply](op, *args, **kwargs) - else: - return op.__default_rule__(*args, **kwargs) # type: ignore + from effectful.internals.runtime import get_interpretation + + intp = get_interpretation() + if op in intp: + return intp[op](*args, **kwargs) + elif apply in intp: + return intp[apply](op, *args, **kwargs) + else: + return op.__default_rule__(*args, **kwargs) # type: ignore @defop diff --git a/effectful/ops/types.py b/effectful/ops/types.py index 015878eb..064a03d8 100644 --- a/effectful/ops/types.py +++ b/effectful/ops/types.py @@ -67,9 +67,11 @@ def __fvs_rule__(self, *args: Q.args, **kwargs: Q.kwargs) -> inspect.BoundArgume @typing.final def __call__(self, *args: Q.args, **kwargs: Q.kwargs) -> V: + from effectful.internals.runtime import acquire_handler_lock from effectful.ops.semantics import apply - return apply.__default_rule__(self, *args, **kwargs) # type: ignore + with acquire_handler_lock(): + return apply.__default_rule__(self, *args, **kwargs) # type: ignore def __repr__(self): return f"{self.__class__.__name__}({self.__name__}, {self.__signature__})" diff --git a/tests/test_handlers_futures.py b/tests/test_handlers_futures.py index de79fdf3..67e1e2a4 100644 --- a/tests/test_handlers_futures.py +++ b/tests/test_handlers_futures.py @@ -72,6 +72,36 @@ def client(x: int): assert add_calls == 10 +def test_concurrent_client_execution(): + add_calls = 0 + add_calls_interp = 0 + + def add_interp(x: int, y: int) -> int: + nonlocal add_calls + no_calls = add_calls + time.sleep(0.001) + add_calls = no_calls + 1 + return x + y + + def client(x: int): + # clients submitted to the executor ARE NOT synchronous + nonlocal add_calls_interp + no_calls = add_calls_interp + time.sleep(0.001) + add_calls_interp = no_calls + 1 + return add(x, x) + + with ( + handler(ThreadPoolFuturesInterpretation(max_workers=4)), + handler({add: add_interp}), + ): + _ = sum(Executor.map(client, list(range(10)))) + # With mutual exclusion, we're guaranteed to get exactly 10 + assert add_calls == 10 + # client is not synchronous so no guarantees. + assert add_calls_interp != 10 + + def test_release_lock_for_concurrent_io(): """Test that release_handler_lock allows concurrent I/O operations. diff --git a/tests/test_handlers_llm_futures.py b/tests/test_handlers_llm_futures.py new file mode 100644 index 00000000..d595e1bb --- /dev/null +++ b/tests/test_handlers_llm_futures.py @@ -0,0 +1,61 @@ +""" +Tests for the LLM handler Future support. + +This module tests that LLM templates with Future[T] return types +correctly submit work concurrently and decode using the inner type. +""" + +import time +from concurrent.futures import Future +from inspect import BoundArguments +from typing import Any, override + +from effectful.handlers.futures import ThreadPoolFuturesInterpretation +from effectful.handlers.llm import Template +from effectful.handlers.llm.providers import OpenAIAPIProvider +from effectful.ops.semantics import handler +from effectful.ops.types import NotHandled + + +class SlowMockLLMProvider(OpenAIAPIProvider): + """Mock provider that simulates slow LLM responses for testing concurrency.""" + + def __init__(self, response, delay: float = 0.05, mapping={}): + self.response = response + self.delay = delay + self.calls: list[tuple[Any, tuple[Any], type]] = [] + self.mapping = mapping + + @override + def _openai_api_call[T]( + self, template: Any, args: BoundArguments, retty: type[T] + ) -> T: + self.calls.append((template, args.args, retty)) + time.sleep(self.delay) + + return self.mapping.get((template, tuple(args.args)), self.response) + + +@Template.define +def hiaku(topic: str) -> Future[str]: + """Return a hiaku about {topic}.""" + raise NotHandled + + +# synchronous template for comparison +@Template.define +def hiaku_s(topic: str) -> str: + """Return a hiaku about {topic}.""" + raise NotImplementedError + + +def test_future_return_type_decodes_inner_type(): + """Test that Future[int] templates correctly decode to int.""" + ref_hiaku = "apples to oranges, oranges to pears, I don't know what a hiaku is" + mock_provider = SlowMockLLMProvider(ref_hiaku, delay=0.001) + + with handler(ThreadPoolFuturesInterpretation()), handler(mock_provider): + future = hiaku("apples") + assert isinstance(future, Future) + result = future.result() + assert result == ref_hiaku From 58bc40bbd11c6ecb581498d3d6fc9496ebfae957 Mon Sep 17 00:00:00 2001 From: Kiran Gopinathan Date: Tue, 18 Nov 2025 12:07:19 -0500 Subject: [PATCH 4/9] removed old print call --- effectful/handlers/futures/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/effectful/handlers/futures/__init__.py b/effectful/handlers/futures/__init__.py index 937240d2..6f4257cf 100644 --- a/effectful/handlers/futures/__init__.py +++ b/effectful/handlers/futures/__init__.py @@ -242,7 +242,6 @@ def wait[T]( or any(not isinstance(t, Future) for t in fs) ): raise NotHandled - print(f"wait called with {fs} {timeout} {return_when}") return futures.wait(fs, timeout, return_when) # type: ignore From ba879e3be1f3550fcb53b4d7ff80a1646c55ce9c Mon Sep 17 00:00:00 2001 From: Kiran Gopinathan Date: Wed, 19 Nov 2025 15:29:07 -0500 Subject: [PATCH 5/9] removed locking and updated tests --- effectful/handlers/futures/__init__.py | 8 +-- effectful/internals/runtime.py | 50 ------------------- effectful/ops/types.py | 4 +- tests/test_handlers_futures.py | 68 ++++++-------------------- 4 files changed, 16 insertions(+), 114 deletions(-) diff --git a/effectful/handlers/futures/__init__.py b/effectful/handlers/futures/__init__.py index 6f4257cf..d38aa005 100644 --- a/effectful/handlers/futures/__init__.py +++ b/effectful/handlers/futures/__init__.py @@ -119,14 +119,8 @@ def submit(self, task: Callable, *args, **kwargs) -> Future: # Capture the current interpretation context context = get_interpretation() - @functools.wraps(task) - def wrapped_task(*task_args, **task_kwargs): - # Restore the interpretation context in the worker thread - with interpreter(context): - return task(*task_args, **task_kwargs) - # Submit the wrapped task to the underlying executor - return self.executor.submit(wrapped_task, *args, **kwargs) + return self.executor.submit(interpreter(context)(task), *args, **kwargs) @implements(Executor.map) def map(self, func: Callable, *iterables, timeout=None, chunksize=1): diff --git a/effectful/internals/runtime.py b/effectful/internals/runtime.py index 0fac8850..a954f36d 100644 --- a/effectful/internals/runtime.py +++ b/effectful/internals/runtime.py @@ -6,9 +6,6 @@ from effectful.ops.syntax import defop from effectful.ops.types import Interpretation, Operation -# Global reentrant lock for mutual exclusion of handler execution -_handler_lock = threading.RLock() - class Runtime[S, T](threading.local): """Thread-local runtime for effectful interpretations.""" @@ -20,53 +17,6 @@ def __init__(self): self.interpretation = {} -def get_handler_lock() -> threading.RLock: - """Get the global handler execution lock. - - This lock ensures mutual exclusion for handler execution by default. - Use release_handler_lock() to temporarily release it for concurrent operations. - """ - return _handler_lock - - -@contextlib.contextmanager -def release_handler_lock(): - """Context manager to temporarily release the handler lock. - - Use this when performing I/O operations or other blocking calls - that should allow other threads to execute handlers concurrently. - - Example: - @defop - def llm_call(prompt: str) -> str: - with release_handler_lock(): - # HTTP request can run while other threads execute handlers - response = requests.post(url, json={"prompt": prompt}) - return response.json()["result"] - """ - lock = get_handler_lock() - lock.release() - try: - yield - finally: - lock.acquire() - - -@contextlib.contextmanager -def acquire_handler_lock(): - """Context manager to acquire the handler lock. - - This is called automatically by apply() for handler execution. - Most users won't need to call this directly. - """ - lock = get_handler_lock() - lock.acquire() - try: - yield - finally: - lock.release() - - @functools.lru_cache(maxsize=1) def get_runtime() -> Runtime: return Runtime() diff --git a/effectful/ops/types.py b/effectful/ops/types.py index 064a03d8..015878eb 100644 --- a/effectful/ops/types.py +++ b/effectful/ops/types.py @@ -67,11 +67,9 @@ def __fvs_rule__(self, *args: Q.args, **kwargs: Q.kwargs) -> inspect.BoundArgume @typing.final def __call__(self, *args: Q.args, **kwargs: Q.kwargs) -> V: - from effectful.internals.runtime import acquire_handler_lock from effectful.ops.semantics import apply - with acquire_handler_lock(): - return apply.__default_rule__(self, *args, **kwargs) # type: ignore + return apply.__default_rule__(self, *args, **kwargs) # type: ignore def __repr__(self): return f"{self.__class__.__name__}({self.__name__}, {self.__signature__})" diff --git a/tests/test_handlers_futures.py b/tests/test_handlers_futures.py index 67e1e2a4..f1df7992 100644 --- a/tests/test_handlers_futures.py +++ b/tests/test_handlers_futures.py @@ -7,13 +7,13 @@ import time from concurrent.futures import Future +from threading import RLock import effectful.handlers.futures as futures from effectful.handlers.futures import ( Executor, ThreadPoolFuturesInterpretation, ) -from effectful.internals.runtime import release_handler_lock from effectful.ops.semantics import NotHandled, defop, evaluate, handler from effectful.ops.types import Term @@ -45,11 +45,14 @@ def test_uninterp_async(): def test_mutual_exclusion(): - """Test that handler execution is mutually exclusive by default. + """Handler execution is not mutually exclusive by default, just + like any other object call. As in python, if you call a function + that may have some shared state, you must lock it as a client. Without mutual exclusion, the race condition in add_interp would cause add_calls to be less than 10. With mutual exclusion, we're guaranteed to get exactly 10 calls. + """ add_calls = 0 @@ -60,8 +63,13 @@ def add_interp(x: int, y: int) -> int: add_calls = no_calls + 1 return x + y + client_lock = RLock() + def client(x: int): - return add(x, x) + # hey, I'm running a function that may have shared state, let me lock it + with client_lock: + res = add(x, x) + return res with ( handler(ThreadPoolFuturesInterpretation(max_workers=4)), @@ -96,59 +104,12 @@ def client(x: int): handler({add: add_interp}), ): _ = sum(Executor.map(client, list(range(10)))) - # With mutual exclusion, we're guaranteed to get exactly 10 - assert add_calls == 10 + # Without mutual exclusion, we're not guaranteed to get exactly 10 + assert add_calls != 10 # client is not synchronous so no guarantees. assert add_calls_interp != 10 -def test_release_lock_for_concurrent_io(): - """Test that release_handler_lock allows concurrent I/O operations. - - This demonstrates the pattern for handlers that perform I/O and want - to allow other handlers to run concurrently during the I/O wait. - """ - from effectful.internals.runtime import release_handler_lock - - io_calls = 0 - concurrent_ios = 0 - max_concurrent = 0 - - @defop - def io_operation(x: int) -> int: - """Simulates a slow io operation""" - raise NotHandled - - def io_interp(x: int) -> int: - nonlocal io_calls, concurrent_ios, max_concurrent - # update state using lock - io_calls += 1 - - # release lock for IO - with release_handler_lock(): - concurrent_ios += 1 - max_concurrent = max(max_concurrent, concurrent_ios) - time.sleep(0.01) # Simulate I/O wait - concurrent_ios -= 1 - return x * 2 - - def client(x: int): - return io_operation(x) - - with ( - handler(ThreadPoolFuturesInterpretation(max_workers=4)), - handler({io_operation: io_interp}), - ): - results = list(Executor.map(client, list(range(10)))) - - assert io_calls == 10 - assert results == [x * 2 for x in range(10)] - # With release_handler_lock, multiple I/O operations can run concurrently - assert max_concurrent > 1, ( - f"Expected concurrent I/O, got max_concurrent={max_concurrent}" - ) - - def test_wait_several_futures(): def client_code(): results = [] @@ -234,8 +195,7 @@ def test_concurrent_execution_faster_than_sequential(): def add_with_sleep(x, y): # important: we must release lock here to allow concurrency start = time.time() - with release_handler_lock(): - time.sleep(sleep_duration) + time.sleep(sleep_duration) return time.time() - start with ( From 5c6f3c4dffe04fab84c23c9e1c46b35a1c8feadb Mon Sep 17 00:00:00 2001 From: Kiran Gopinathan Date: Wed, 19 Nov 2025 15:48:41 -0500 Subject: [PATCH 6/9] forward args to ThreadPool Executor --- effectful/handlers/futures/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/effectful/handlers/futures/__init__.py b/effectful/handlers/futures/__init__.py index d38aa005..91969cd6 100644 --- a/effectful/handlers/futures/__init__.py +++ b/effectful/handlers/futures/__init__.py @@ -166,13 +166,13 @@ class ThreadPoolFuturesInterpretation(FuturesInterpretation): >>> print(result) # 1024 """ - def __init__(self, max_workers=None): + def __init__(self, *args, **kwargs): """ Initialize with a ThreadPoolExecutor. :param max_workers: Maximum number of worker threads (default: None, uses default from ThreadPoolExecutor) """ - super().__init__(ThreadPoolExecutor(max_workers=max_workers)) + super().__init__(ThreadPoolExecutor(*args, **kwargs)) type ReturnOptions = Literal["All_COMPLETED", "FIRST_COMPLETED", "FIRST_EXCEPTION"] From 5af3568f0d2ac623d2505056ba9cb30a2e408ef2 Mon Sep 17 00:00:00 2001 From: Kiran Gopinathan Date: Wed, 19 Nov 2025 17:12:51 -0500 Subject: [PATCH 7/9] removed the future support inside providers --- effectful/handlers/llm/providers.py | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/effectful/handlers/llm/providers.py b/effectful/handlers/llm/providers.py index 87b0856b..ee33aca1 100644 --- a/effectful/handlers/llm/providers.py +++ b/effectful/handlers/llm/providers.py @@ -6,7 +6,6 @@ import string import typing from collections.abc import Hashable, Iterable, Mapping, Sequence -from concurrent.futures import Future from typing import Any, get_type_hints import pydantic @@ -23,7 +22,6 @@ from openai.types.responses import FunctionToolParam -from effectful.handlers.futures import Executor from effectful.handlers.llm import Template from effectful.ops.semantics import fwd from effectful.ops.syntax import ObjectInterpretation, defop, implements @@ -378,16 +376,4 @@ def _call[**P, T]( ret_type = template.__signature__.return_annotation bound_args = template.__signature__.bind(*args, **kwargs) bound_args.apply_defaults() - - # Check if return type is Future[T] - origin = typing.get_origin(ret_type) - if origin is Future: - inner_type = typing.get_args(ret_type)[0] - return Executor.submit( - self._openai_api_call, # type: ignore - template, # type: ignore - bound_args, # type: ignore - inner_type, - ) - return self._openai_api_call(template, bound_args, ret_type) From bd650b7be36a1350b9e7af11b06dff5f294676b2 Mon Sep 17 00:00:00 2001 From: Kiran Gopinathan Date: Wed, 19 Nov 2025 17:14:34 -0500 Subject: [PATCH 8/9] updated tests --- tests/test_handlers_llm_futures.py | 69 ++++++++++++++++++++++++------ 1 file changed, 55 insertions(+), 14 deletions(-) diff --git a/tests/test_handlers_llm_futures.py b/tests/test_handlers_llm_futures.py index d595e1bb..4b7491a6 100644 --- a/tests/test_handlers_llm_futures.py +++ b/tests/test_handlers_llm_futures.py @@ -6,11 +6,13 @@ """ import time +from collections.abc import Callable from concurrent.futures import Future from inspect import BoundArguments from typing import Any, override -from effectful.handlers.futures import ThreadPoolFuturesInterpretation +import effectful.handlers.futures as futures +from effectful.handlers.futures import Executor, ThreadPoolFuturesInterpretation from effectful.handlers.llm import Template from effectful.handlers.llm.providers import OpenAIAPIProvider from effectful.ops.semantics import handler @@ -28,34 +30,73 @@ def __init__(self, response, delay: float = 0.05, mapping={}): @override def _openai_api_call[T]( - self, template: Any, args: BoundArguments, retty: type[T] + self, template: Template, args: BoundArguments, retty: type[T] ) -> T: self.calls.append((template, args.args, retty)) time.sleep(self.delay) - - return self.mapping.get((template, tuple(args.args)), self.response) + return self.mapping.get(template, {}).get(tuple(args.args), self.response) @Template.define -def hiaku(topic: str) -> Future[str]: +def hiaku(topic: str) -> str: """Return a hiaku about {topic}.""" raise NotHandled -# synchronous template for comparison -@Template.define -def hiaku_s(topic: str) -> str: - """Return a hiaku about {topic}.""" - raise NotImplementedError - - def test_future_return_type_decodes_inner_type(): - """Test that Future[int] templates correctly decode to int.""" + """Test that llm templates correctly decode to int, even wrapped in a future.""" ref_hiaku = "apples to oranges, oranges to pears, I don't know what a hiaku is" mock_provider = SlowMockLLMProvider(ref_hiaku, delay=0.001) with handler(ThreadPoolFuturesInterpretation()), handler(mock_provider): - future = hiaku("apples") + future = Executor.submit(hiaku, "apples") assert isinstance(future, Future) result = future.result() assert result == ref_hiaku + + +@Template.define +def generate_program(task: str) -> Callable[[int], int]: + """Generate a Python program that {task}.""" + raise NotHandled + + +def test_concurrent_program_generation(): + """Simulate concurrent LLM calls to generate Python programs and pick the best one.""" + # Mock responses for different approaches to the same task + responses = { + generate_program: { + ("implement fibonacci algorithm 0",): "def fib(n: int) -> int: return n", + ( + "implement fibonacci algorithm 1", + ): "def fib(n: int) -> int: return n * fib(n - 1)", + ( + "implement fibonacci algorithm 2", + ): "def fib(n: int) -> int: return fib(n - 2) + fib(n - 1) if n > 1 else 0", + } + } + + mock_provider = SlowMockLLMProvider( + response="print('Default')", delay=0.01, mapping=responses + ) + + user_request: str = "implement fibonacci algorithm" + + with handler(ThreadPoolFuturesInterpretation()), handler(mock_provider): + # Launch multiple LLM calls concurrently + tasks = [ + Executor.submit(generate_program, (user_request + f" {i}")) + for i in range(3) + ] + + # Collect all results as they finish + results_as_completed = (f.result() for f in futures.as_completed(tasks)) + + valid_results = [(result, len(result)) for result in results_as_completed] + + # Pick the "best" result (here: the shortest program, as a naive heuristic) + best_program = max(valid_results, key=lambda pair: pair[1])[0] + + # Assertions + assert len(valid_results) == 3 + assert best_program in set(responses[generate_program].values()) From bfb7d7a87cf9a14a29383e157529164cd2753b0f Mon Sep 17 00:00:00 2001 From: Kiran Gopinathan Date: Wed, 19 Nov 2025 17:24:14 -0500 Subject: [PATCH 9/9] updated docstrings --- effectful/handlers/futures/__init__.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/effectful/handlers/futures/__init__.py b/effectful/handlers/futures/__init__.py index 91969cd6..625d5d4b 100644 --- a/effectful/handlers/futures/__init__.py +++ b/effectful/handlers/futures/__init__.py @@ -42,9 +42,8 @@ def submit[**P, T]( >>> from effectful.handlers.futures import ThreadPoolFuturesInterpretation >>> from effectful.ops.semantics import handler >>> - >>> pool = ThreadPoolExecutor() - >>> with handler(ThreadPoolFuturesInterpretation(pool)): - >>> future = Executor.submit(my_function, arg1, arg2) + >>> with handler(ThreadPoolFuturesInterpretation()): + >>> future = Executor.submit(lambda x,y: x + y, 1, 2) """ raise NotHandled @@ -153,16 +152,17 @@ class ThreadPoolFuturesInterpretation(FuturesInterpretation): Example: >>> from concurrent.futures import ThreadPoolExecutor, Future - >>> from effectful.ops.semantics import defop, handler + >>> from effectful.ops.syntax import defop + >>> from effectful.ops.semantics import handler >>> from effectful.handlers.futures import Executor, ThreadPoolFuturesInterpretation >>> >>> @defop - >>> def async_pow(n: int, k: int) -> Future[int]: + >>> def pow(n: int, k: int) -> Future[int]: >>> return Executor.submit(pow, n, k) >>> >>> pool = ThreadPoolExecutor() >>> with handler(ThreadPoolFuturesInterpretation(pool)): - >>> result = async_pow(2, 10).result() + >>> result = pow(2, 10).result() >>> print(result) # 1024 """