From 351848b3fa40917ff3b9d74601c8693017c71fc1 Mon Sep 17 00:00:00 2001 From: hntrl Date: Sat, 12 Apr 2025 14:05:31 -0600 Subject: [PATCH 1/6] feat(eventkit): `SingletonObservable` --- packages/eventkit/lib/index.ts | 2 + packages/eventkit/lib/singleton.ts | 83 +++++++++++++++++++++++++++ packages/eventkit/lib/utils/errors.ts | 13 +++++ 3 files changed, 98 insertions(+) create mode 100644 packages/eventkit/lib/singleton.ts diff --git a/packages/eventkit/lib/index.ts b/packages/eventkit/lib/index.ts index ff6def7..24c8b25 100644 --- a/packages/eventkit/lib/index.ts +++ b/packages/eventkit/lib/index.ts @@ -4,6 +4,8 @@ export * from "./stream"; export * from "./utils/errors"; +export { SingletonAsyncObservable } from "./singleton"; + export { // @eventkit/async-observable/from type AsyncObservableInputType, diff --git a/packages/eventkit/lib/singleton.ts b/packages/eventkit/lib/singleton.ts new file mode 100644 index 0000000..7aa47c0 --- /dev/null +++ b/packages/eventkit/lib/singleton.ts @@ -0,0 +1,83 @@ +import { AsyncObservable, Subscriber } from "@eventkit/async-observable"; + +import { NoValuesError } from "./utils/errors"; + +/** + * An extension of AsyncObservable that implements PromiseLike, allowing it to be used with + * await syntax. + * + * This class is designed for observables that are expected to emit exactly one value, such as + * those created by operators like `reduce` or `count`. It provides a convenient way to await + * the single emitted value without having to manually set up a subscription. + * + * @example + * ```ts + * // instead of + * let value: T | undefined; + * await observable.subscribe((v) => { + * // this observable only emits one value, so this will only be called once + * value = v; + * }); + * + * // you can do + * const value: T = await observable; + * + * // like with `reduce` for instance + * const result = await from([1, 2, 3]).pipe(reduce((acc, val) => acc + val, 0)); + * console.log(result); // 6 + * ``` + * + * @template T - The type of the value emitted by the observable + */ +export class SingletonAsyncObservable extends AsyncObservable implements PromiseLike { + /** + * Returns a promise that will subscribe to the observable and resolve when the subscriber emits + * its first value. This is useful in cases where you know the observable will emit one and only + * one value (like the result of a `reduce` or `count` operator), but you want to wait for the + * value to be emitted using `await` syntax instead of a subscription callback. + * + * When the first value is emitted, the subscriber will immediately be cancelled and all cleanup + * work will be performed before the promise resolves. + * + * @throws {NoValuesError} If the observable completes without emitting any values. + * + * @param onfulfilled Optional callback to execute when the promise resolves successfully + * @param onrejected Optional callback to execute when the promise rejects with an error + * @returns A promise that resolves with the result of the onfulfilled/onrejected handlers + */ + then( + onfulfilled?: ((value: T) => TResult1 | PromiseLike) | null, + onrejected?: ((reason: any) => TResult2 | PromiseLike) | null + ): Promise { + const sub = new Subscriber(this); + return Promise.race([ + // Race against the return signal to see if the observable completed without emitting any + // values. + sub._returnSignal.then(() => { + throw new NoValuesError(); + }), + // Race against the first value to be emitted by the observable. + sub.next(), + ]) + .then(async (result) => { + await sub.cancel(); + return result.value; + }) + .then(onfulfilled, onrejected); + } +} + +/** + * Reconstructs a SingletonAsyncObservable from an existing AsyncObservable. We use this in + * operators to intrinsically return singletons that retains the operator behavior of the source + * without introducing the concept of a singleton to async-observable + * + * @param source The AsyncObservable to reconstruct + * @returns A SingletonAsyncObservable that is a composition of the source and the provided operator + * @internal + */ +export function singletonFrom(source: AsyncObservable): SingletonAsyncObservable { + const observable = new SingletonAsyncObservable(source._generator); + observable._scheduler = source._scheduler; + return observable; +} diff --git a/packages/eventkit/lib/utils/errors.ts b/packages/eventkit/lib/utils/errors.ts index 3502404..e6a59d3 100644 --- a/packages/eventkit/lib/utils/errors.ts +++ b/packages/eventkit/lib/utils/errors.ts @@ -27,3 +27,16 @@ export class InvalidConcurrencyLimitError extends Error { this.name = "InvalidConcurrencyLimitError"; } } + +/** + * An error thrown when an observable completes without emitting any values. + * + * @group Errors + */ +export class NoValuesError extends Error { + /** @internal */ + constructor() { + super("no values"); + this.name = "NoValuesError"; + } +} From f77d0e31bb9582c9c76ea3a24b719397334b0dfd Mon Sep 17 00:00:00 2001 From: hntrl Date: Sat, 12 Apr 2025 14:05:58 -0600 Subject: [PATCH 2/6] feat(eventkit): make compatible operators into singletons --- packages/eventkit/lib/operators/count.ts | 25 +++++++++------- packages/eventkit/lib/operators/elementAt.ts | 31 ++++++++++++-------- packages/eventkit/lib/operators/reduce.ts | 27 +++++++++-------- packages/eventkit/lib/utils/types.ts | 13 +++++++- 4 files changed, 59 insertions(+), 37 deletions(-) diff --git a/packages/eventkit/lib/operators/count.ts b/packages/eventkit/lib/operators/count.ts index f27378b..49ffe95 100644 --- a/packages/eventkit/lib/operators/count.ts +++ b/packages/eventkit/lib/operators/count.ts @@ -1,4 +1,5 @@ -import { type OperatorFunction } from "@eventkit/async-observable"; +import { singletonFrom } from "../singleton"; +import { type SingletonOperatorFunction } from "../utils/types"; /** * Counts the number of items emitted by the source observable, and emits that @@ -12,17 +13,19 @@ import { type OperatorFunction } from "@eventkit/async-observable"; */ export function count( predicate?: (value: T, index: number) => boolean -): OperatorFunction { +): SingletonOperatorFunction { predicate = predicate ?? (() => true); return (source) => - new source.AsyncObservable(async function* () { - let index = 0; - let count = 0; - for await (const value of source) { - if (predicate(value, index++)) { - count++; + singletonFrom( + new source.AsyncObservable(async function* () { + let index = 0; + let count = 0; + for await (const value of source) { + if (predicate(value, index++)) { + count++; + } } - } - yield count; - }); + yield count; + }) + ); } diff --git a/packages/eventkit/lib/operators/elementAt.ts b/packages/eventkit/lib/operators/elementAt.ts index beae857..42239c7 100644 --- a/packages/eventkit/lib/operators/elementAt.ts +++ b/packages/eventkit/lib/operators/elementAt.ts @@ -1,6 +1,6 @@ -import { type OperatorFunction } from "@eventkit/async-observable"; - +import { singletonFrom } from "../singleton"; import { ArgumentOutOfRangeError } from "../utils/errors"; +import { type SingletonOperatorFunction } from "../utils/types"; /** * Emits the single value at the specified `index` in the source observable, or a default value @@ -15,20 +15,25 @@ import { ArgumentOutOfRangeError } from "../utils/errors"; * @param defaultValue The default value returned for missing indices. * @group Operators */ -export function elementAt(index: number, defaultValue?: D): OperatorFunction { +export function elementAt( + index: number, + defaultValue?: D +): SingletonOperatorFunction { if (index < 0) { throw new ArgumentOutOfRangeError(); } return (source) => - new source.AsyncObservable(async function* () { - let i = 0; - for await (const value of source) { - if (i++ === index) { - yield value; - return; + singletonFrom( + new source.AsyncObservable(async function* () { + let i = 0; + for await (const value of source) { + if (i++ === index) { + yield value; + return; + } } - } - if (defaultValue) yield defaultValue; - else throw new ArgumentOutOfRangeError(); - }); + if (defaultValue) yield defaultValue; + else throw new ArgumentOutOfRangeError(); + }) + ); } diff --git a/packages/eventkit/lib/operators/reduce.ts b/packages/eventkit/lib/operators/reduce.ts index 087ab49..44c047d 100644 --- a/packages/eventkit/lib/operators/reduce.ts +++ b/packages/eventkit/lib/operators/reduce.ts @@ -1,4 +1,5 @@ -import { type OperatorFunction } from "@eventkit/async-observable"; +import { singletonFrom } from "../singleton"; +import { type SingletonOperatorFunction } from "../utils/types"; /** * Applies an accumulator function over the source generator, and returns the @@ -18,23 +19,25 @@ import { type OperatorFunction } from "@eventkit/async-observable"; export function reduce( accumulator: (acc: A, value: V, index: number) => A, seed: A -): OperatorFunction; +): SingletonOperatorFunction; export function reduce( accumulator: (acc: A | undefined, value: V, index: number) => A, seed?: A -): OperatorFunction; +): SingletonOperatorFunction; export function reduce( accumulator: (acc: A | undefined, value: V, index: number) => A, seed?: A -): OperatorFunction { +): SingletonOperatorFunction { const hasSeed = arguments.length >= 2; return (source) => - new source.AsyncObservable(async function* (this: any) { - let acc = hasSeed ? seed : undefined; - let index = 0; - for await (const value of source) { - acc = accumulator(acc as A | undefined, value, index++); - } - yield acc as A; - }); + singletonFrom( + new source.AsyncObservable(async function* (this: any) { + let acc = hasSeed ? seed : undefined; + let index = 0; + for await (const value of source) { + acc = accumulator(acc as A | undefined, value, index++); + } + yield acc as A; + }) + ); } diff --git a/packages/eventkit/lib/utils/types.ts b/packages/eventkit/lib/utils/types.ts index e1c5b85..0ee8512 100644 --- a/packages/eventkit/lib/utils/types.ts +++ b/packages/eventkit/lib/utils/types.ts @@ -1,4 +1,10 @@ -import { type AsyncObservableInput } from "@eventkit/async-observable"; +import { + type UnaryFunction, + type AsyncObservable, + type AsyncObservableInput, +} from "@eventkit/async-observable"; + +import { type SingletonAsyncObservable } from "../singleton"; /** * A simple type to represent a gamut of "falsy" values... with a notable exception: @@ -12,3 +18,8 @@ export type TruthyTypesOf = T extends Falsy ? never : T; export type AsyncObservableInputTuple = { [K in keyof T]: AsyncObservableInput; }; + +export type SingletonOperatorFunction = UnaryFunction< + AsyncObservable, + SingletonAsyncObservable +>; From 879710e254c8144a201049a3438f2e22b6693fd8 Mon Sep 17 00:00:00 2001 From: hntrl Date: Sat, 12 Apr 2025 14:06:14 -0600 Subject: [PATCH 3/6] chore(eventkit): add singleton operator tests --- .../__tests__/operators/count.spec.ts | 30 ++++++++++ .../__tests__/operators/elementAt.spec.ts | 31 ++++++++++ .../__tests__/operators/reduce.spec.ts | 58 +++++++++++++++++++ 3 files changed, 119 insertions(+) diff --git a/packages/eventkit/__tests__/operators/count.spec.ts b/packages/eventkit/__tests__/operators/count.spec.ts index 24505d3..907fda9 100644 --- a/packages/eventkit/__tests__/operators/count.spec.ts +++ b/packages/eventkit/__tests__/operators/count.spec.ts @@ -27,6 +27,11 @@ describe("count", () => { await sub; expect(completionSpy).toHaveBeenCalledTimes(1); }); + + it("should emit final count using singleton object", async () => { + const source = AsyncObservable.from([1, 2, 3]); + expect(await source.pipe(count())).toEqual(3); + }); }); describe("when source emits no values", () => { @@ -40,6 +45,11 @@ describe("count", () => { expect(result).toEqual([0]); }); + + it("should emit 0 using singleton object", async () => { + const source = AsyncObservable.from([]); + expect(await source.pipe(count())).toEqual(0); + }); }); describe("when source emits multiple values", () => { @@ -72,6 +82,11 @@ describe("count", () => { expect(indexSpy).toHaveBeenNthCalledWith(2, 1); expect(indexSpy).toHaveBeenNthCalledWith(3, 2); }); + + it("should emit final count using singleton object", async () => { + const source = AsyncObservable.from(["a", "b", "c"]); + expect(await source.pipe(count())).toEqual(3); + }); }); describe("when predicate is provided", () => { @@ -116,6 +131,11 @@ describe("count", () => { expect(result).toEqual([0]); // No values counted }); + + it("should pass final count for values that satisfy predicate using singleton object", async () => { + const source = AsyncObservable.from([1, 2, 3, 4, 5]); + expect(await source.pipe(count((value) => value % 2 === 0))).toEqual(2); + }); }); describe("when source errors", () => { @@ -136,5 +156,15 @@ describe("count", () => { expect(capturedError).toBe(error); }); + + it("should propagate error when using singleton object", async () => { + const error = new Error("test error"); + const source = new AsyncObservable(async function* () { + yield 1; + await delay(5); + throw error; + }); + await expect(source.pipe(count())).rejects.toThrow(error); + }); }); }); diff --git a/packages/eventkit/__tests__/operators/elementAt.spec.ts b/packages/eventkit/__tests__/operators/elementAt.spec.ts index 2d98a99..64d119e 100644 --- a/packages/eventkit/__tests__/operators/elementAt.spec.ts +++ b/packages/eventkit/__tests__/operators/elementAt.spec.ts @@ -38,6 +38,11 @@ describe("elementAt", () => { expect(nextSpy).toHaveBeenCalledTimes(1); expect(nextSpy).toHaveBeenCalledWith(30); }); + + it("should emit value at specified index using singleton object", async () => { + const source = AsyncObservable.from(["a", "b", "c", "d"]); + expect(await source.pipe(elementAt(2))).toEqual("c"); + }); }); describe("when index is out of range", () => { @@ -65,6 +70,11 @@ describe("elementAt", () => { expect(capturedError).toBeInstanceOf(ArgumentOutOfRangeError); }); + + it("should emit default value if no default value is provided using singleton object", async () => { + const source = AsyncObservable.from([1, 2, 3]); + await expect(source.pipe(elementAt(10))).rejects.toThrow(ArgumentOutOfRangeError); + }); }); describe("when index is negative", () => { @@ -102,6 +112,11 @@ describe("elementAt", () => { expect(capturedError).toBeInstanceOf(ArgumentOutOfRangeError); }); + + it("should emit default value if no default value is provided using singleton object", async () => { + const source = AsyncObservable.from([1, 2, 3]); + await expect(source.pipe(elementAt(5))).rejects.toThrow(ArgumentOutOfRangeError); + }); }); describe("when source errors", () => { @@ -122,6 +137,17 @@ describe("elementAt", () => { expect(capturedError).toBe(error); }); + + it("should propagate error when using singleton object", async () => { + const error = new Error("source error"); + const source = new AsyncObservable(async function* () { + yield 1; + yield 2; + await delay(5); + throw error; + }); + await expect(source.pipe(elementAt(3))).rejects.toThrow(error); + }); }); describe("when source emits multiple values", () => { @@ -150,5 +176,10 @@ describe("elementAt", () => { expect(nextSpy).toHaveBeenCalledTimes(1); expect(nextSpy).toHaveBeenCalledWith("c"); }); + + it("should emit value at specified index using singleton object", async () => { + const source = AsyncObservable.from(["a", "b", "c", "d", "e"]); + expect(await source.pipe(elementAt(2))).toEqual("c"); + }); }); }); diff --git a/packages/eventkit/__tests__/operators/reduce.spec.ts b/packages/eventkit/__tests__/operators/reduce.spec.ts index b573c59..fec5e39 100644 --- a/packages/eventkit/__tests__/operators/reduce.spec.ts +++ b/packages/eventkit/__tests__/operators/reduce.spec.ts @@ -27,6 +27,11 @@ describe("reduce", () => { await sub; expect(completionSpy).toHaveBeenCalledTimes(1); }); + + it("should emit final accumulated value using singleton object", async () => { + const source = AsyncObservable.from([1, 2, 3, 4]); + expect(await source.pipe(reduce((acc, value) => acc + value, 0))).toEqual(10); + }); }); describe("when seed value is provided", () => { @@ -52,6 +57,11 @@ describe("reduce", () => { expect(accumulatorSpy).toHaveBeenCalledTimes(3); expect(accumulatorSpy).toHaveBeenNthCalledWith(1, 5, 1, 0); }); + + it("should emit final accumulated value using singleton object", async () => { + const source = AsyncObservable.from([1, 2, 3]); + expect(await source.pipe(reduce((acc, value) => acc + value, 5))).toEqual(11); + }); }); describe("when no seed value is provided", () => { @@ -81,6 +91,13 @@ describe("reduce", () => { expect(result).toEqual(["abc"]); }); + + it("should emit final accumulated value using singleton object", async () => { + const source = AsyncObservable.from(["a", "b", "c"]); + expect( + await source.pipe(reduce((acc, value) => (acc || "") + value)) + ).toEqual("abc"); + }); }); describe("when source emits multiple values", () => { @@ -121,6 +138,13 @@ describe("reduce", () => { expect(indexSpy).toHaveBeenNthCalledWith(2, 1); expect(indexSpy).toHaveBeenNthCalledWith(3, 2); }); + + it("should emit final accumulated value using singleton object", async () => { + const source = AsyncObservable.from(["a", "b", "c"]); + expect( + await source.pipe(reduce((acc, value) => (acc || "") + value)) + ).toEqual("abc"); + }); }); describe("when source emits no values", () => { @@ -143,6 +167,11 @@ describe("reduce", () => { await source.pipe(reduce(accumulatorSpy)).subscribe(() => {}); expect(accumulatorSpy).not.toHaveBeenCalled(); }); + + it("should emit seed value using singleton object", async () => { + const source = AsyncObservable.from([]); + expect(await source.pipe(reduce((acc, value) => acc + value, 42))).toEqual(42); + }); }); describe("when accumulator throws error", () => { @@ -187,6 +216,19 @@ describe("reduce", () => { expect(nextSpy).not.toHaveBeenCalled(); }); + + it("should propagate error when using singleton object", async () => { + const source = AsyncObservable.from([1, 2, 3]); + const error = new Error("accumulator error"); + await expect( + source.pipe( + reduce((acc, value) => { + if (value === 2) throw error; + return acc + value; + }, 0) + ) + ).rejects.toThrow(error); + }); }); describe("when source errors", () => { @@ -227,6 +269,17 @@ describe("reduce", () => { expect(nextSpy).not.toHaveBeenCalled(); }); + + it("should propagate error when using singleton object", async () => { + const error = new Error("source error"); + const source = new AsyncObservable(async function* () { + yield 1; + yield 2; + await delay(5); + throw error; + }); + await expect(source.pipe(reduce((acc, value) => acc + value, 0))).rejects.toThrow(error); + }); }); describe("when accumulator returns undefined", () => { @@ -252,5 +305,10 @@ describe("reduce", () => { expect(accumulatorSpy).toHaveBeenNthCalledWith(2, undefined, 2, 1); expect(accumulatorSpy).toHaveBeenNthCalledWith(3, undefined, 3, 2); }); + + it("should handle undefined as valid result using singleton object", async () => { + const source = AsyncObservable.from([1, 2, 3]); + expect(await source.pipe(reduce(() => undefined))).toEqual(undefined); + }); }); }); From e9bae106a573e658960712989a940fd06a13de2b Mon Sep 17 00:00:00 2001 From: hntrl Date: Sat, 12 Apr 2025 14:20:07 -0600 Subject: [PATCH 4/6] chore(eventkit): add singleton tests --- packages/eventkit/__tests__/singleton.test.ts | 144 ++++++++++++++++++ 1 file changed, 144 insertions(+) create mode 100644 packages/eventkit/__tests__/singleton.test.ts diff --git a/packages/eventkit/__tests__/singleton.test.ts b/packages/eventkit/__tests__/singleton.test.ts new file mode 100644 index 0000000..5983f43 --- /dev/null +++ b/packages/eventkit/__tests__/singleton.test.ts @@ -0,0 +1,144 @@ +import { describe, it, expect, vi } from "vitest"; +import { AsyncObservable } from "@eventkit/async-observable"; +import { SingletonAsyncObservable, singletonFrom } from "../lib/singleton"; +import { NoValuesError } from "../lib/utils/errors"; + +describe("SingletonAsyncObservable", () => { + describe("constructor", () => { + it("should create a new SingletonAsyncObservable instance", async () => { + const singleton = new SingletonAsyncObservable(async function* () { + yield 42; + }); + expect(singleton).toBeInstanceOf(SingletonAsyncObservable); + }); + + it("should inherit from AsyncObservable", async () => { + const singleton = new SingletonAsyncObservable(async function* () { + yield 42; + }); + expect(singleton).toBeInstanceOf(AsyncObservable); + }); + }); + + describe("then method", () => { + it("should implement PromiseLike interface", async () => { + const singleton = new SingletonAsyncObservable(async function* () { + yield 42; + }); + expect(typeof singleton.then).toBe("function"); + }); + + it("should resolve with the first emitted value", async () => { + const singleton = new SingletonAsyncObservable(async function* () { + yield 1; + yield 2; + yield 3; + }); + const result = await singleton; + expect(result).toBe(1); + }); + + it("should throw NoValuesError when no values are emitted", async () => { + const singleton = new SingletonAsyncObservable(async function* () { + // No values emitted + }); + await expect(singleton).rejects.toThrow(NoValuesError); + }); + + it("should cancel the subscription after the first value is emitted", async () => { + const mockCancel = vi.fn(); + const singleton = new SingletonAsyncObservable(async function* () { + try { + yield 42; + yield 43; + yield 44; + } finally { + mockCancel(); + } + }); + + await singleton; + // subscriber count should be 0 after the first value is emitted + expect(mockCancel).toHaveBeenCalled(); + }); + + it("should work with await syntax", async () => { + const singleton = new SingletonAsyncObservable(async function* () { + yield 42; + }); + const result = await singleton; + expect(result).toBe(42); + }); + + it("should support onfulfilled callback", async () => { + const singleton = new SingletonAsyncObservable(async function* () { + yield 42; + }); + + const result = await singleton.then((value) => value * 2); + expect(result).toBe(84); + }); + + it("should support onrejected callback for error handling", async () => { + const error = new Error("Test error"); + const singleton = new SingletonAsyncObservable(async function* () { + throw error; + }); + + const result = await singleton.then( + () => "success", + () => "error handled" + ); + + expect(result).toBe("error handled"); + }); + }); +}); + +describe("singletonFrom function", () => { + it("should create a SingletonAsyncObservable from an AsyncObservable", async () => { + const source = new AsyncObservable(async function* () { + yield 42; + }); + + const singleton = singletonFrom(source); + expect(singleton).toBeInstanceOf(SingletonAsyncObservable); + }); + + it("should preserve the generator function from the source", async () => { + const generator = async function* () { + yield 42; + }; + + const source = new AsyncObservable(generator); + const singleton = singletonFrom(source); + + // Test that the generator function is preserved by checking if it produces the same result + const result = await singleton; + expect(result).toBe(42); + }); + + it("should preserve the scheduler from the source", async () => { + const source = new AsyncObservable(async function* () { + yield 42; + }); + + const customScheduler = source._scheduler; + const singleton = singletonFrom(source); + + expect(singleton._scheduler).toBe(customScheduler); + }); + + it("should allow awaiting the first value", async () => { + const source = new AsyncObservable(async function* () { + yield 1; + yield 2; + yield 3; + }); + + const singleton = singletonFrom(source); + const result = await singleton; + + expect(result).toBe(1); + }); +}); From c2572a15ee4373acb56ab4ad4d2b7f2136b32c5e Mon Sep 17 00:00:00 2001 From: hntrl Date: Sat, 12 Apr 2025 14:41:47 -0600 Subject: [PATCH 5/6] feat(docs): add callout to singleton observables --- docs/guide/concepts/transforming-data.md | 29 ++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/docs/guide/concepts/transforming-data.md b/docs/guide/concepts/transforming-data.md index 0811347..1d470d2 100644 --- a/docs/guide/concepts/transforming-data.md +++ b/docs/guide/concepts/transforming-data.md @@ -94,6 +94,35 @@ await processed$.subscribe(console.log); // -> 4 (after reduce, 4 + 0 = 4) ``` +## Singleton Operators + +The output of some operators is an observable that only emits a single value. For those special cases, the observable that is returned is what is known as a "singleton" observable. (See [SingletonAsyncObservable](/reference/_eventkit/base/SingletonAsyncObservable)) + +All a singleton observable does is extend [AsyncObservable](/reference/_eventkit/base/AsyncObservable) class and implement the `PromiseLike` interface, which means that when it's used in an `await` statement (or with the `then` method) it will return a promise that will subscribe to the observable and resolve with the first (and only) value emitted. + +This means that when dealing with a singleton observable, all you need to do is `await` the observable to get the emitted value instead of calling `subscribe` and hoisting the value out of the callback. + +For example, the [`first`](/reference/_eventkit/base/first) operator is a singleton operator that emits the first value of an observable and then completes. + +```ts +import { AsyncObservable, first } from "@eventkit/base"; + +const obs = AsyncObservable.from([1, 2, 3]); +const singleton = obs.pipe(first()); + +// instead of this: +let firstValue: number | undefined; +await obs.subscribe((value) => { + firstValue = value; +}); +console.log(firstValue); // 1 + +// you can just do this: +console.log(await singleton); // 1 +``` + +Singleton observables are meant to provide a shorthand for dealing with observables that only emit a single value. They are still observables in every other way, so you can still use methods like `pipe`, `subscribe`, `drain`, etc. + ## Available Operators Eventkit provides a variety of built-in operators to handle common transformations. A complete reference of all operators can be found [here](/reference/operators). From 1371b774b5409b5aa45e56fb215b27ab7233bd9b Mon Sep 17 00:00:00 2001 From: hntrl Date: Sat, 12 Apr 2025 14:47:35 -0600 Subject: [PATCH 6/6] chore: add singleton changeset --- .changeset/fine-parents-serve.md | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 .changeset/fine-parents-serve.md diff --git a/.changeset/fine-parents-serve.md b/.changeset/fine-parents-serve.md new file mode 100644 index 0000000..a14e76a --- /dev/null +++ b/.changeset/fine-parents-serve.md @@ -0,0 +1,22 @@ +--- +"@eventkit/base": minor +--- + +Introduces `SingletonAsyncObservable`; a utility class for observables that lets you access the value emitted by observables that emit one (and only one) value (like the observables returned from `reduce()`, `count()`, etc.) using native await syntax. + +This makes the consumption of these single value operators a little bit more readable. For instance: + +```ts +const obs = AsyncObservable.from([1, 2, 3]); +const singleton = obs.pipe(first()); + +// instead of this: +let firstValue: number | undefined; +await obs.subscribe((value) => { + firstValue = value; +}); +console.log(firstValue); // 1 + +// you can just do this: +console.log(await singleton); // 1 +```