-
Notifications
You must be signed in to change notification settings - Fork 3
Alternative proposal for concurrent.futures support in effectful
#400
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
kiranandcode
wants to merge
9
commits into
staging-llm
Choose a base branch
from
kg-futures-proposal
base: staging-llm
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
6fde909
initial commit
kiranandcode 3dd752d
added thread safety and test
kiranandcode 42a4d81
added futures support to LLM and refined tests some more
kiranandcode 58bc40b
removed old print call
kiranandcode ba879e3
removed locking and updated tests
kiranandcode 5c6f3c4
forward args to ThreadPool Executor
kiranandcode 5af3568
removed the future support inside providers
kiranandcode bd650b7
updated tests
kiranandcode bfb7d7a
updated docstrings
kiranandcode File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,249 @@ | ||
| """ | ||
| Futures handler for effectful - provides integration with concurrent.futures. | ||
|
|
||
| This module provides operations for working with concurrent.futures, allowing | ||
| effectful operations to be executed asynchronously in thread pools with | ||
| automatic preservation of interpretation context. | ||
| """ | ||
|
|
||
| import concurrent.futures as futures | ||
| import functools | ||
| from collections.abc import Callable, Iterable | ||
| from concurrent.futures import Future, ThreadPoolExecutor | ||
| from dataclasses import dataclass | ||
| from typing import Literal | ||
|
|
||
| from effectful.ops.semantics import defop | ||
| from effectful.ops.syntax import ObjectInterpretation, defdata, implements | ||
| from effectful.ops.types import NotHandled, Term | ||
|
|
||
|
|
||
| class Executor: | ||
| """Namespace for executor-related operations.""" | ||
|
|
||
| @staticmethod | ||
| @defop # type: ignore | ||
| def submit[**P, T]( | ||
| task: Callable[P, T], *args: P.args, **kwargs: P.kwargs | ||
| ) -> Future[T]: | ||
| """ | ||
| Submit a task for asynchronous execution. | ||
|
|
||
| This operation should be handled by providing a FuturesInterpretation | ||
| which automatically preserves the interpretation context across thread boundaries. | ||
|
|
||
| :param task: The callable to execute asynchronously | ||
| :param args: Positional arguments for the task | ||
| :param kwargs: Keyword arguments for the task | ||
| :return: A Future representing the asynchronous computation | ||
|
|
||
| Example: | ||
| >>> from concurrent.futures import ThreadPoolExecutor | ||
| >>> from effectful.handlers.futures import ThreadPoolFuturesInterpretation | ||
| >>> from effectful.ops.semantics import handler | ||
| >>> | ||
| >>> with handler(ThreadPoolFuturesInterpretation()): | ||
| >>> future = Executor.submit(lambda x,y: x + y, 1, 2) | ||
| """ | ||
| raise NotHandled | ||
|
|
||
| @staticmethod | ||
| @defop | ||
| def map[T, R]( | ||
| func: Callable[[T], R], | ||
| *iterables: Iterable[T], | ||
| timeout: float | None = None, | ||
| chunksize: int = 1, | ||
| ) -> Iterable[R]: | ||
| """ | ||
| Map a function over iterables, executing asynchronously. | ||
|
|
||
| Returns an iterator yielding results as they complete. Equivalent to | ||
| map(func, *iterables) but executes asynchronously. | ||
|
|
||
| This operation should be handled by providing a FuturesInterpretation | ||
| which automatically preserves the interpretation context across thread boundaries. | ||
|
|
||
| :param func: The function to map over the iterables | ||
| :param iterables: One or more iterables to map over | ||
| :param timeout: Maximum time to wait for a result (default: None) | ||
| :param chunksize: Size of chunks for ProcessPoolExecutor (default: 1) | ||
| :return: An iterator yielding results | ||
|
|
||
| Example: | ||
| >>> from effectful.handlers.futures import ThreadPoolFuturesInterpretation | ||
| >>> from effectful.ops.semantics import handler | ||
| >>> | ||
| >>> def square(x): | ||
| >>> return x ** 2 | ||
| >>> | ||
| >>> with handler(ThreadPoolFuturesInterpretation()): | ||
| >>> results = list(Executor.map(square, range(10))) | ||
| >>> print(results) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] | ||
| """ | ||
| raise NotHandled | ||
|
|
||
|
|
||
| class FuturesInterpretation(ObjectInterpretation): | ||
| """ | ||
| Base interpretation for concurrent.futures executors. | ||
|
|
||
| This interpretation automatically preserves the effectful interpretation context | ||
| when submitting tasks to worker threads, ensuring that effectful operations | ||
| work correctly across thread boundaries. | ||
| """ | ||
|
|
||
| def __init__(self, executor: futures.Executor): | ||
| """ | ||
| Initialize the futures interpretation. | ||
|
|
||
| :param executor: The executor to use (ThreadPoolExecutor or ProcessPoolExecutor) | ||
| """ | ||
| super().__init__() | ||
| self.executor: futures.Executor = executor | ||
|
|
||
| def shutdown(self, *args, **kwargs): | ||
| self.executor.shutdown(*args, **kwargs) | ||
|
|
||
| @implements(Executor.submit) | ||
| def submit(self, task: Callable, *args, **kwargs) -> Future: | ||
| """ | ||
| Submit a task to the executor with automatic context preservation. | ||
|
|
||
| Captures the current interpretation context and ensures it is restored | ||
| in the worker thread before executing the task. | ||
| """ | ||
| from effectful.internals.runtime import get_interpretation, interpreter | ||
|
|
||
| # Capture the current interpretation context | ||
| context = get_interpretation() | ||
|
|
||
| # Submit the wrapped task to the underlying executor | ||
| return self.executor.submit(interpreter(context)(task), *args, **kwargs) | ||
|
|
||
| @implements(Executor.map) | ||
| def map(self, func: Callable, *iterables, timeout=None, chunksize=1): | ||
| """ | ||
| Map a function over iterables with automatic context preservation. | ||
|
|
||
| Captures the current interpretation context and ensures it is restored | ||
| in each worker thread before executing the function. | ||
| """ | ||
| from effectful.internals.runtime import get_interpretation, interpreter | ||
|
|
||
| # Capture the current interpretation context | ||
| context = get_interpretation() | ||
|
|
||
| @functools.wraps(func) | ||
| def wrapped_func(*args, **kwargs): | ||
| # Restore the interpretation context in the worker thread | ||
| with interpreter(context): | ||
| return func(*args, **kwargs) | ||
|
|
||
| # Call the executor's map with the wrapped function | ||
| return self.executor.map( | ||
| wrapped_func, *iterables, timeout=timeout, chunksize=chunksize | ||
| ) | ||
|
|
||
|
|
||
| class ThreadPoolFuturesInterpretation(FuturesInterpretation): | ||
| """ | ||
| Interpretation for ThreadPoolExecutor with automatic context preservation. | ||
|
|
||
| Example: | ||
| >>> from concurrent.futures import ThreadPoolExecutor, Future | ||
| >>> from effectful.ops.syntax import defop | ||
| >>> from effectful.ops.semantics import handler | ||
| >>> from effectful.handlers.futures import Executor, ThreadPoolFuturesInterpretation | ||
| >>> | ||
| >>> @defop | ||
| >>> def pow(n: int, k: int) -> Future[int]: | ||
| >>> return Executor.submit(pow, n, k) | ||
| >>> | ||
| >>> pool = ThreadPoolExecutor() | ||
| >>> with handler(ThreadPoolFuturesInterpretation(pool)): | ||
| >>> result = pow(2, 10).result() | ||
| >>> print(result) # 1024 | ||
| """ | ||
|
|
||
| def __init__(self, *args, **kwargs): | ||
| """ | ||
| Initialize with a ThreadPoolExecutor. | ||
|
|
||
| :param max_workers: Maximum number of worker threads (default: None, uses default from ThreadPoolExecutor) | ||
| """ | ||
| super().__init__(ThreadPoolExecutor(*args, **kwargs)) | ||
|
|
||
|
|
||
| type ReturnOptions = Literal["All_COMPLETED", "FIRST_COMPLETED", "FIRST_EXCEPTION"] | ||
|
|
||
|
|
||
| @dataclass(frozen=True) | ||
| class DoneAndNotDoneFutures[T]: | ||
| done: set[Future[T]] | ||
| not_done: set[Future[T]] | ||
|
|
||
|
|
||
| @defdata.register(DoneAndNotDoneFutures) | ||
| class _DoneAndNotDoneFuturesTerm[T](Term[DoneAndNotDoneFutures[T]]): | ||
| """Term representing a DoneAndNotDoneFutures result.""" | ||
|
|
||
| def __init__(self, op, *args, **kwargs): | ||
| self._op = op | ||
| self._args = args | ||
| self._kwargs = kwargs | ||
|
|
||
| @property | ||
| def op(self): | ||
| return self._op | ||
|
|
||
| @property | ||
| def args(self): | ||
| return self._args | ||
|
|
||
| @property | ||
| def kwargs(self): | ||
| return self._kwargs | ||
|
|
||
| @defop # type: ignore[prop-decorator] | ||
| @property | ||
| def done(self) -> set[Future[T]]: | ||
| """Get the set of done futures.""" | ||
| if not isinstance(self, Term): | ||
| return self.done | ||
| else: | ||
| raise NotHandled | ||
|
|
||
| @defop # type: ignore[prop-decorator] | ||
| @property | ||
| def not_done(self) -> set[Future[T]]: | ||
| """Get the set of not done futures.""" | ||
| if not isinstance(self, Term): | ||
| return self.not_done | ||
| else: | ||
| raise NotHandled | ||
|
|
||
|
|
||
| @defop | ||
| def wait[T]( | ||
| fs: Iterable[Future[T]], | ||
| timeout: int | None = None, | ||
| return_when: ReturnOptions = futures.ALL_COMPLETED, # type: ignore | ||
| ) -> DoneAndNotDoneFutures[T]: | ||
| if ( | ||
| isinstance(timeout, Term) | ||
| or isinstance(return_when, Term) | ||
| or any(not isinstance(t, Future) for t in fs) | ||
| ): | ||
| raise NotHandled | ||
| return futures.wait(fs, timeout, return_when) # type: ignore | ||
|
|
||
|
|
||
| @defop | ||
| def as_completed[T]( | ||
| fs: Iterable[Future[T]], | ||
| timeout: int | None = None, | ||
| ) -> Iterable[Future[T]]: | ||
| if isinstance(timeout, Term) or any(isinstance(t, Term) for t in fs): | ||
| raise NotHandled | ||
| return futures.as_completed(fs, timeout) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this defined in the concurrent.futures library? Do we need our own dataclass definition?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's a namedtuple in the library, I added this dataclass because
@defop ...using the namedtuple type was not allowing.doneiirc (or there could have been another issue that was causing this to fail and I misdiagnosed the cause)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There shouldn't be a behavioral difference in
effectfulbetween the dataclass and the namedtuple for this simple case, so it's probably worth creating a self-contained test that passes with the dataclass and fails with the namedtuple so that we can try to fix it.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yep will do!