From 22540d99d01cb6dc4928deb2e8f25f66c09befec Mon Sep 17 00:00:00 2001 From: Kiran Gopinathan Date: Tue, 25 Nov 2025 09:32:13 -0500 Subject: [PATCH 1/2] implemented reduced k-ahead sampler --- effectful/handlers/llm/sampling.py | 60 +++++++++++++++++++++++++++++- 1 file changed, 59 insertions(+), 1 deletion(-) diff --git a/effectful/handlers/llm/sampling.py b/effectful/handlers/llm/sampling.py index effc197d..7e8f1fb0 100644 --- a/effectful/handlers/llm/sampling.py +++ b/effectful/handlers/llm/sampling.py @@ -1,4 +1,5 @@ -from collections import Counter +from collections import Counter, defaultdict +from collections.abc import Callable from concurrent import futures from concurrent.futures.thread import ThreadPoolExecutor @@ -45,3 +46,60 @@ def n_votes_ahead(): tasks.append(executor.submit(interpreter(intp)(fwd), *args, **kwargs)) executor.shutdown() return self.votes.most_common(1)[0][0] + + +class ReducedKAheadSampler[**P, T, K](ObjectInterpretation): + """KAheadSampler for LLM calls, where votes are generated from LLM outputs.""" + + no_voters: int + k: int + """Number of votes ahead before an answer is accepted""" + + votes: Counter[K] = Counter() + results: dict[K, set[T]] = defaultdict(set) + reducer: Callable[[T], K] + select_best: Callable[[set[T]], T] + + def __init__( + self, + reducer: Callable[[T], K], + select_best: Callable[[set[T]], T] = lambda s: next(iter(s)), + no_voters: int = 6, + k: int = 3, + ): + self.no_voters = no_voters + self.k = k + self.reducer = reducer + self.select_best = select_best + + @implements(Template.__call__) + def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T: + executor = ThreadPoolExecutor() + intp = get_interpretation() + tasks = [ + executor.submit(interpreter(intp)(fwd), *args, **kwargs) + for _ in range(self.no_voters) + ] + + def n_votes_ahead(): + match self.votes.most_common(2): + case [[_, v1], [_, v2]]: + return v1 >= v2 + self.k + case [[_, v1]]: + return v1 >= self.k + case _: + return False + + while not n_votes_ahead(): + done, remain = futures.wait(tasks, return_when=futures.FIRST_COMPLETED) + tasks = list(remain) + for fut in done: + res = fut.result() + vote = self.reducer(res) + self.votes[vote] += 1 + self.results[vote].add(res) + tasks.append(executor.submit(interpreter(intp)(fwd), *args, **kwargs)) + executor.shutdown() + vote = self.votes.most_common(1)[0][0] + res = self.select_best(self.results[vote]) + return res From 01e037fb30c0bf9687ea115567c9744b629ddd8e Mon Sep 17 00:00:00 2001 From: Kiran Gopinathan Date: Tue, 25 Nov 2025 09:33:57 -0500 Subject: [PATCH 2/2] changed to list instead --- effectful/handlers/llm/sampling.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/effectful/handlers/llm/sampling.py b/effectful/handlers/llm/sampling.py index 7e8f1fb0..1778d3b2 100644 --- a/effectful/handlers/llm/sampling.py +++ b/effectful/handlers/llm/sampling.py @@ -56,14 +56,14 @@ class ReducedKAheadSampler[**P, T, K](ObjectInterpretation): """Number of votes ahead before an answer is accepted""" votes: Counter[K] = Counter() - results: dict[K, set[T]] = defaultdict(set) + results: dict[K, list[T]] = defaultdict(list) reducer: Callable[[T], K] - select_best: Callable[[set[T]], T] + select_best: Callable[[list[T]], T] def __init__( self, reducer: Callable[[T], K], - select_best: Callable[[set[T]], T] = lambda s: next(iter(s)), + select_best: Callable[[list[T]], T] = lambda s: next(iter(s)), no_voters: int = 6, k: int = 3, ): @@ -97,7 +97,7 @@ def n_votes_ahead(): res = fut.result() vote = self.reducer(res) self.votes[vote] += 1 - self.results[vote].add(res) + self.results[vote].append(res) tasks.append(executor.submit(interpreter(intp)(fwd), *args, **kwargs)) executor.shutdown() vote = self.votes.most_common(1)[0][0]