diff --git a/.changeset/olive-doodles-fix.md b/.changeset/olive-doodles-fix.md new file mode 100644 index 0000000..85ff0f3 --- /dev/null +++ b/.changeset/olive-doodles-fix.md @@ -0,0 +1,5 @@ +--- +"@eventkit/base": minor +--- + +Introduces **10** new operators into eventkit: `find`, `findIndex`, `first`, `isEmpty`, `last`, `max`, `min`, `pairwise`, `skip`, and `every`. See the [docs](https://hntrl.github.io/eventkit/guide/concepts/transforming-data#available-operators) for a complete reference. diff --git a/.changeset/sad-radios-rhyme.md b/.changeset/sad-radios-rhyme.md new file mode 100644 index 0000000..e700b8a --- /dev/null +++ b/.changeset/sad-radios-rhyme.md @@ -0,0 +1,5 @@ +--- +"@eventkit/base": patch +--- + +Fixed some invariant behavior with the `reduce` operator where the chain of accumulator calls depending on the seed value wasn't consistent with the native array method diff --git a/docs/guide/concepts/transforming-data.md b/docs/guide/concepts/transforming-data.md index 1d470d2..2914fde 100644 --- a/docs/guide/concepts/transforming-data.md +++ b/docs/guide/concepts/transforming-data.md @@ -125,7 +125,57 @@ Singleton observables are meant to provide a shorthand for dealing with observab ## Available Operators -Eventkit provides a variety of built-in operators to handle common transformations. A complete reference of all operators can be found [here](/reference/operators). +Eventkit provides a variety of built-in operators to handle common transformations. Below is a complete list of all the operators that ship as standard with eventkit. + +### Join Operators + +- [concat](/reference/_eventkit/base/concat) +- [concatAll](/reference/_eventkit/base/concatAll) +- [concatMap](/reference/_eventkit/base/concatMap) +- [merge](/reference/_eventkit/base/merge) +- [mergeAll](/reference/_eventkit/base/mergeAll) +- [mergeMap](/reference/_eventkit/base/mergeMap) + +### Transformation Operators + +- [buffer](/reference/_eventkit/base/buffer) +- [bufferCount](/reference/_eventkit/base/bufferCount) +- [map](/reference/_eventkit/base/map) +- [pairwise](/reference/_eventkit/base/pairwise) +- [partition](/reference/_eventkit/base/partition) +- [takeUntil](/reference/_eventkit/base/takeUntil) + +### Filtering Operators + +- [elementAt](/reference/_eventkit/base/elementAt) +- [filter](/reference/_eventkit/base/filter) +- [find](/reference/_eventkit/base/find) +- [findIndex](/reference/_eventkit/base/findIndex) +- [first](/reference/_eventkit/base/first) +- [last](/reference/_eventkit/base/last) +- [skip](/reference/_eventkit/base/skip) + +### Error Handling Operators + +- [dlq](/reference/_eventkit/base/dlq) +- [retry](/reference/_eventkit/base/retry) + +### Scheduling Operators + +- [withOwnScheduler](/reference/_eventkit/base/withOwnScheduler) +- [withScheduler](/reference/_eventkit/base/withScheduler) + +### Boolean Operators + +- [every](/reference/_eventkit/base/every) +- [isEmpty](/reference/_eventkit/base/isEmpty) + +### Aggregation Operators + +- [max](/reference/_eventkit/base/max) +- [min](/reference/_eventkit/base/min) +- [count](/reference/_eventkit/base/count) +- [reduce](/reference/_eventkit/base/reduce) ## Creating Custom Operators diff --git a/packages/eventkit/__tests__/operators/every.spec.ts b/packages/eventkit/__tests__/operators/every.spec.ts new file mode 100644 index 0000000..6cadbd5 --- /dev/null +++ b/packages/eventkit/__tests__/operators/every.spec.ts @@ -0,0 +1,236 @@ +import { AsyncObservable } from "@eventkit/async-observable"; +import { every } from "../../lib/operators/every"; +import { vi, describe, it, expect } from "vitest"; +import { SingletonAsyncObservable } from "../../lib/singleton"; + +describe("every", () => { + it("should return a SingletonAsyncObservable", async () => { + const obs = AsyncObservable.from([1, 2, 3]); + const result = obs.pipe(every((x: number) => x > 0)); + expect(result).toBeInstanceOf(SingletonAsyncObservable); + }); + + describe("when all values satisfy the predicate", () => { + it("should emit true when all values pass", async () => { + const source = AsyncObservable.from([1, 2, 3]); + const result = await source.pipe(every((x) => x > 0)); + expect(result).toBe(true); + }); + + it("should complete after emitting true", async () => { + const source = AsyncObservable.from([1, 2, 3]); + const completionSpy = vi.fn(); + + const sub = source.pipe(every((x) => x > 0)).subscribe(() => {}); + sub.finally(completionSpy); + + await sub; + expect(completionSpy).toHaveBeenCalledTimes(1); + }); + + it("should work with await syntax", async () => { + const source = AsyncObservable.from([2, 4, 6, 8]); + const result = await source.pipe(every((x) => x % 2 === 0)); + expect(result).toBe(true); + }); + + it("should handle empty observables (emit true)", async () => { + const source = AsyncObservable.from([]); + const result = await source.pipe(every((x) => !!x)); + expect(result).toBe(true); + }); + }); + + describe("when any value fails the predicate", () => { + it("should emit false immediately when a value fails", async () => { + const source = AsyncObservable.from([1, 2, 3, 4, 5]); + const result = await source.pipe(every((x) => x <= 3)); + expect(result).toBe(false); + }); + + it("should cancel the source observable after emitting false", async () => { + const cancelSpy = vi.fn(); + const nextSpy = vi.fn(); + + const source = new AsyncObservable(async function* () { + try { + yield 1; + yield 2; + yield 4; // This fails the predicate + yield 3; + nextSpy(); + yield 5; + } finally { + cancelSpy(); + } + }); + + await source.pipe(every((x) => x <= 3)); + expect(cancelSpy).toHaveBeenCalledTimes(1); + expect(nextSpy).not.toHaveBeenCalled(); + }); + + it("should work with await syntax", async () => { + const source = AsyncObservable.from([2, 4, 5, 8]); + const result = await source.pipe(every((x) => x % 2 === 0)); + expect(result).toBe(false); + }); + + it("should handle the first value failing", async () => { + const source = AsyncObservable.from([0, 2, 4, 6]); + const result = await source.pipe(every((x) => x > 0)); + expect(result).toBe(false); + }); + + it("should handle a middle value failing", async () => { + const source = AsyncObservable.from([2, 4, 5, 6, 8]); + const result = await source.pipe(every((x) => x % 2 === 0)); + expect(result).toBe(false); + }); + + it("should handle the last value failing", async () => { + const source = AsyncObservable.from([2, 4, 6, 7]); + const result = await source.pipe(every((x) => x % 2 === 0)); + expect(result).toBe(false); + }); + }); + + describe("when predicate throws an error", () => { + it("should propagate the error to the subscriber", async () => { + const error = new Error("predicate error"); + const source = AsyncObservable.from([1, 2, 3]); + + let caughtError: Error | null = null; + try { + await source.pipe( + every(() => { + throw error; + }) + ); + } catch (e) { + caughtError = e as Error; + } + + expect(caughtError).toBe(error); + }); + + it("should cancel the source observable", async () => { + const error = new Error("predicate error"); + const cancelSpy = vi.fn(); + + const source = new AsyncObservable(async function* () { + try { + yield 1; + yield 2; + yield 3; + } finally { + cancelSpy(); + } + }); + + try { + await source.pipe( + every(() => { + throw error; + }) + ); + } catch (e) { + // Expected error + } + + expect(cancelSpy).toHaveBeenCalledTimes(1); + }); + + it("should not emit any value", async () => { + const error = new Error("predicate error"); + const source = AsyncObservable.from([1, 2, 3]); + + const nextSpy = vi.fn(); + try { + await source + .pipe( + every(() => { + throw error; + }) + ) + .subscribe(nextSpy); + } catch (e) { + // Expected error + } + + expect(nextSpy).not.toHaveBeenCalled(); + }); + }); + + describe("when source observable errors", () => { + it("should propagate the error to the subscriber", async () => { + const error = new Error("source error"); + const source = new AsyncObservable(async function* () { + yield 1; + throw error; + }); + + let caughtError: Error | null = null; + try { + await source.pipe(every((x) => x > 0)); + } catch (e) { + caughtError = e as Error; + } + + expect(caughtError).toBe(error); + }); + + it("should not emit any value", async () => { + const error = new Error("source error"); + const source = new AsyncObservable(async function* () { + yield 1; + throw error; + }); + + const nextSpy = vi.fn(); + try { + await source.pipe(every((x) => x > 0)).subscribe(nextSpy); + } catch (e) { + // Expected error + } + + expect(nextSpy).not.toHaveBeenCalled(); + }); + }); + + describe("when predicate uses the index parameter", () => { + it("should pass the correct index to the predicate", async () => { + const source = AsyncObservable.from(["a", "b", "c"]); + const indexSpy = vi.fn().mockReturnValue(true); + + await source.pipe(every(indexSpy)); + + expect(indexSpy).toHaveBeenNthCalledWith(1, "a", 0); + expect(indexSpy).toHaveBeenNthCalledWith(2, "b", 1); + expect(indexSpy).toHaveBeenNthCalledWith(3, "c", 2); + }); + + it("should increment index for each value", async () => { + const source = AsyncObservable.from([10, 20, 30]); + const indices: number[] = []; + + await source.pipe( + every((_, index) => { + indices.push(index); + return true; + }) + ); + + expect(indices).toEqual([0, 1, 2]); + }); + + it("should start index at 0", async () => { + const source = AsyncObservable.from([5, 10, 15]); + const indexSpy = vi.fn().mockReturnValue(true); + + await source.pipe(every(indexSpy)); + + expect(indexSpy).toHaveBeenCalledWith(5, 0); + }); + }); +}); diff --git a/packages/eventkit/__tests__/operators/find.spec.ts b/packages/eventkit/__tests__/operators/find.spec.ts new file mode 100644 index 0000000..fbb8f9d --- /dev/null +++ b/packages/eventkit/__tests__/operators/find.spec.ts @@ -0,0 +1,252 @@ +import { AsyncObservable } from "@eventkit/async-observable"; +import { find } from "../../lib/operators/find"; +import { vi, describe, it, expect } from "vitest"; +import { SingletonAsyncObservable } from "../../lib/singleton"; + +describe("find", () => { + it("should return a SingletonAsyncObservable", async () => { + const obs = AsyncObservable.from([1, 2, 3]); + const result = obs.pipe(find((x: number) => x > 0)); + expect(result).toBeInstanceOf(SingletonAsyncObservable); + }); + + describe("when using a function predicate", () => { + it("should emit the first value that satisfies the predicate", async () => { + const source = AsyncObservable.from([1, 2, 3, 4, 5]); + const result = await source.pipe(find((x) => x > 3)); + expect(result).toBe(4); + }); + + it("should emit undefined if no value satisfies the predicate", async () => { + const source = AsyncObservable.from([1, 2, 3, 4, 5]); + const result = await source.pipe(find((x) => x > 10)); + expect(result).toBeUndefined(); + }); + + it("should cancel the source after finding a match", async () => { + const cancelSpy = vi.fn(); + const nextSpy = vi.fn(); + + const source = new AsyncObservable(async function* () { + try { + yield 1; + yield 2; + yield 3; + yield 4; + nextSpy(); + yield 5; + } finally { + cancelSpy(); + } + }); + + await source.pipe(find((x) => x === 3)); + expect(cancelSpy).toHaveBeenCalledTimes(1); + expect(nextSpy).not.toHaveBeenCalled(); + }); + + it("should work with await syntax", async () => { + const source = AsyncObservable.from([1, 2, 3, 4, 5]); + const result = await source.pipe(find((x) => x === 3)); + expect(result).toBe(3); + }); + + it("should pass the correct index to the predicate", async () => { + const source = AsyncObservable.from(["a", "b", "c"]); + const indexSpy = vi.fn().mockReturnValue(false); + + await source.pipe(find(indexSpy)); + + expect(indexSpy).toHaveBeenNthCalledWith(1, "a", 0); + expect(indexSpy).toHaveBeenNthCalledWith(2, "b", 1); + expect(indexSpy).toHaveBeenNthCalledWith(3, "c", 2); + }); + + it("should increment index for each value", async () => { + const source = AsyncObservable.from([10, 20, 30]); + const indices: number[] = []; + + await source.pipe( + find((_, index) => { + indices.push(index); + return false; + }) + ); + + expect(indices).toEqual([0, 1, 2]); + }); + + it("should start index at 0", async () => { + const source = AsyncObservable.from([5, 10, 15]); + const indexSpy = vi.fn().mockReturnValue(true); + + await source.pipe(find(indexSpy)); + + expect(indexSpy).toHaveBeenCalledWith(5, 0); + }); + }); + + describe("when using BooleanConstructor", () => { + it("should emit the first truthy value", async () => { + const source = AsyncObservable.from([0, "", false, 42, null]); + const result = await source.pipe(find(Boolean)); + expect(result).toBe(42); + }); + + it("should emit undefined if no truthy values exist", async () => { + const source = AsyncObservable.from([0, "", false, null, undefined]); + const result = await source.pipe(find(Boolean)); + expect(result).toBeUndefined(); + }); + + describe("should handle various truthy/falsy values", () => { + it("should handle numbers (0, 1, -1)", async () => { + const source = AsyncObservable.from([0, 1, -1]); + const result = await source.pipe(find(Boolean)); + expect(result).toBe(1); // 1 is the first truthy value + }); + + it('should handle strings ("", "hello")', async () => { + const source = AsyncObservable.from(["", "hello"]); + const result = await source.pipe(find(Boolean)); + expect(result).toBe("hello"); // "hello" is the first truthy value + }); + + it("should handle objects (null, {}, [])", async () => { + const source = AsyncObservable.from([null, {}, []]); + const result = await source.pipe(find(Boolean)); + expect(result).toEqual({}); // {} is the first truthy value + }); + + it("should handle booleans (false, true)", async () => { + const source = AsyncObservable.from([false, true]); + const result = await source.pipe(find(Boolean)); + expect(result).toBe(true); // true is the first truthy value + }); + }); + }); + + describe("when predicate throws an error", () => { + it("should propagate the error to the subscriber", async () => { + const error = new Error("predicate error"); + const source = AsyncObservable.from([1, 2, 3]); + + let caughtError: Error | null = null; + try { + await source.pipe( + find(() => { + throw error; + }) + ); + } catch (e) { + caughtError = e as Error; + } + + expect(caughtError).toBe(error); + }); + + it("should cancel the source observable", async () => { + const error = new Error("predicate error"); + const cancelSpy = vi.fn(); + + const source = new AsyncObservable(async function* () { + try { + yield 1; + yield 2; + yield 3; + } finally { + cancelSpy(); + } + }); + + try { + await source.pipe( + find(() => { + throw error; + }) + ); + } catch (e) { + // Expected error + } + + expect(cancelSpy).toHaveBeenCalledTimes(1); + }); + + it("should not emit any value", async () => { + const error = new Error("predicate error"); + const source = AsyncObservable.from([1, 2, 3]); + + const nextSpy = vi.fn(); + try { + await source + .pipe( + find(() => { + throw error; + }) + ) + .subscribe(nextSpy); + } catch (e) { + // Expected error + } + + expect(nextSpy).not.toHaveBeenCalled(); + }); + }); + + describe("when source observable errors", () => { + it("should propagate the error to the subscriber", async () => { + const error = new Error("source error"); + const source = new AsyncObservable(async function* () { + yield 1; + throw error; + yield 2; + }); + + let caughtError: Error | null = null; + try { + await source.pipe(find((x) => x > 1)); + } catch (e) { + caughtError = e as Error; + } + + expect(caughtError).toBe(error); + }); + + it("should not emit any value", async () => { + const error = new Error("source error"); + const source = new AsyncObservable(async function* () { + yield 1; + throw error; + }); + + const nextSpy = vi.fn(); + try { + await source.pipe(find((x) => x > 1)).subscribe(nextSpy); + } catch (e) { + // Expected error + } + + expect(nextSpy).not.toHaveBeenCalled(); + }); + }); + + describe("type narrowing with predicate", () => { + it("should narrow the type when using type predicate", async () => { + type Item = string | number; + const source = AsyncObservable.from([1, "a", 2, "b"]); + + const result = await source.pipe(find((x): x is string => typeof x === "string")); + + expect(typeof result).toBe("string"); + expect(result).toBe("a"); + }); + + it("should maintain original type when using boolean predicate", async () => { + const source = AsyncObservable.from([1, 2, 3, 4]); + const result = await source.pipe(find((x) => x > 2)); + + expect(typeof result).toBe("number"); + expect(result).toBe(3); + }); + }); +}); diff --git a/packages/eventkit/__tests__/operators/findIndex.spec.ts b/packages/eventkit/__tests__/operators/findIndex.spec.ts new file mode 100644 index 0000000..cf72104 --- /dev/null +++ b/packages/eventkit/__tests__/operators/findIndex.spec.ts @@ -0,0 +1,255 @@ +import { AsyncObservable } from "@eventkit/async-observable"; +import { findIndex } from "../../lib/operators/findIndex"; +import { vi, describe, it, expect } from "vitest"; +import { SingletonAsyncObservable } from "../../lib/singleton"; + +describe("findIndex", () => { + it("should return a SingletonAsyncObservable", async () => { + const obs = AsyncObservable.from([1, 2, 3]); + const result = obs.pipe(findIndex((x: number) => x > 2)); + expect(result).toBeInstanceOf(SingletonAsyncObservable); + }); + + describe("when using a function predicate", () => { + it("should emit the index of the first value that satisfies the predicate", async () => { + const source = AsyncObservable.from([1, 2, 3, 4, 5]); + const result = await source.pipe(findIndex((x) => x > 3)); + expect(result).toBe(3); + }); + + it("should emit -1 if no value satisfies the predicate", async () => { + const source = AsyncObservable.from([1, 2, 3, 4, 5]); + const result = await source.pipe(findIndex((x) => x > 10)); + expect(result).toBe(-1); + }); + + it("should cancel the source after finding a match", async () => { + const cancelSpy = vi.fn(); + const nextSpy = vi.fn(); + const source = new AsyncObservable(async function* () { + try { + yield 1; + yield 2; + yield 3; + yield 4; + nextSpy(); + yield 5; + } finally { + cancelSpy(); + } + }); + + await source.pipe(findIndex((x) => x === 3)); + expect(cancelSpy).toHaveBeenCalledTimes(1); + expect(nextSpy).not.toHaveBeenCalled(); + }); + + it("should work with await syntax", async () => { + const source = AsyncObservable.from([1, 2, 3, 4, 5]); + const index = await source.pipe(findIndex((x) => x === 3)); + expect(index).toBe(2); + }); + + it("should pass the correct index to the predicate", async () => { + const source = AsyncObservable.from(["a", "b", "c"]); + const indexSpy = vi.fn().mockReturnValue(false); + + await source.pipe(findIndex(indexSpy)); + + expect(indexSpy).toHaveBeenNthCalledWith(1, "a", 0); + expect(indexSpy).toHaveBeenNthCalledWith(2, "b", 1); + expect(indexSpy).toHaveBeenNthCalledWith(3, "c", 2); + }); + + it("should increment index for each value", async () => { + const source = AsyncObservable.from([10, 20, 30]); + const indices: number[] = []; + + await source.pipe( + findIndex((_, index) => { + indices.push(index); + return false; + }) + ); + + expect(indices).toEqual([0, 1, 2]); + }); + + it("should start index at 0", async () => { + const source = AsyncObservable.from([5, 10, 15]); + const indexSpy = vi.fn().mockReturnValue(true); + + await source.pipe(findIndex(indexSpy)); + + expect(indexSpy).toHaveBeenCalledWith(5, 0); + }); + + it("should handle empty observables (emit -1)", async () => { + const source = AsyncObservable.from([]); + const result = await source.pipe(findIndex((x) => !!x)); + expect(result).toBe(-1); + }); + }); + + describe("when using BooleanConstructor", () => { + it("should emit the index of the first truthy value", async () => { + const source = AsyncObservable.from([0, "", false, 42, null]); + const result = await source.pipe(findIndex(Boolean)); + expect(result).toBe(3); + }); + + it("should emit -1 if no truthy values exist", async () => { + const source = AsyncObservable.from([0, "", false, null, undefined]); + const result = await source.pipe(findIndex(Boolean)); + expect(result).toBe(-1); + }); + + describe("should handle various truthy/falsy values", () => { + it("should handle numbers (0, 1, -1)", async () => { + const source = AsyncObservable.from([0, 1, -1]); + const result = await source.pipe(findIndex(Boolean)); + expect(result).toBe(1); // 1 is the first truthy value + }); + + it('should handle strings ("", "hello")', async () => { + const source = AsyncObservable.from(["", "hello"]); + const result = await source.pipe(findIndex(Boolean)); + expect(result).toBe(1); // "hello" is the first truthy value + }); + + it("should handle objects (null, {}, [])", async () => { + const source = AsyncObservable.from([null, {}, []]); + const result = await source.pipe(findIndex(Boolean)); + expect(result).toBe(1); // {} is the first truthy value + }); + + it("should handle booleans (false, true)", async () => { + const source = AsyncObservable.from([false, true]); + const result = await source.pipe(findIndex(Boolean)); + expect(result).toBe(1); // true is the first truthy value + }); + }); + }); + + describe("when predicate throws an error", () => { + it("should propagate the error to the subscriber", async () => { + const error = new Error("predicate error"); + const source = AsyncObservable.from([1, 2, 3]); + + let caughtError: Error | null = null; + try { + await source.pipe( + findIndex(() => { + throw error; + }) + ); + } catch (e) { + caughtError = e as Error; + } + + expect(caughtError).toBe(error); + }); + + it("should cancel the source observable", async () => { + const error = new Error("predicate error"); + const cancelSpy = vi.fn(); + + const source = new AsyncObservable(async function* () { + try { + yield 1; + yield 2; + yield 3; + } finally { + cancelSpy(); + } + }); + + try { + await source.pipe( + findIndex(() => { + throw error; + }) + ); + } catch (e) { + // Expected error + } + + expect(cancelSpy).toHaveBeenCalledTimes(1); + }); + + it("should not emit any value", async () => { + const error = new Error("predicate error"); + const source = AsyncObservable.from([1, 2, 3]); + + const nextSpy = vi.fn(); + try { + await source + .pipe( + findIndex(() => { + throw error; + }) + ) + .subscribe(nextSpy); + } catch (e) { + // Expected error + } + + expect(nextSpy).not.toHaveBeenCalled(); + }); + }); + + describe("when source observable errors", () => { + it("should propagate the error to the subscriber", async () => { + const error = new Error("source error"); + const source = new AsyncObservable(async function* () { + yield 1; + throw error; + }); + + let caughtError: Error | null = null; + try { + await source.pipe(findIndex((x) => x > 1)); + } catch (e) { + caughtError = e as Error; + } + + expect(caughtError).toBe(error); + }); + + it("should not emit any value", async () => { + const error = new Error("source error"); + const source = new AsyncObservable(async function* () { + yield 1; + throw error; + }); + + const nextSpy = vi.fn(); + try { + await source.pipe(findIndex((x) => x > 1)).subscribe(nextSpy); + } catch (e) { + // Expected error + } + + expect(nextSpy).not.toHaveBeenCalled(); + }); + }); + + describe("type narrowing with predicate", () => { + it("should return number when using function predicate", async () => { + const source = AsyncObservable.from([1, 2, 3]); + const result = await source.pipe(findIndex((x) => x > 1)); + expect(typeof result).toBe("number"); + }); + + it("should return -1 | number when using BooleanConstructor", async () => { + const source = AsyncObservable.from([false, null, undefined]); + const result = await source.pipe(findIndex(Boolean)); + expect(result).toBe(-1); + + const source2 = AsyncObservable.from([false, true]); + const result2 = await source2.pipe(findIndex(Boolean)); + expect(typeof result2).toBe("number"); + expect(result2).toBe(1); + }); + }); +}); diff --git a/packages/eventkit/__tests__/operators/first.spec.ts b/packages/eventkit/__tests__/operators/first.spec.ts new file mode 100644 index 0000000..c7994d6 --- /dev/null +++ b/packages/eventkit/__tests__/operators/first.spec.ts @@ -0,0 +1,335 @@ +import { AsyncObservable } from "@eventkit/async-observable"; +import { first } from "../../lib/operators/first"; +import { vi, describe, it, expect } from "vitest"; +import { SingletonAsyncObservable } from "../../lib/singleton"; +import { NoValuesError } from "../../lib/utils/errors"; + +describe("first", () => { + it("should return a SingletonAsyncObservable", async () => { + const obs = AsyncObservable.from([1, 2, 3]); + const result = obs.pipe(first()); + expect(result).toBeInstanceOf(SingletonAsyncObservable); + }); + + describe("when no predicate is provided", () => { + it("should emit the first value from the source", async () => { + const source = AsyncObservable.from([1, 2, 3]); + const result = await source.pipe(first()); + expect(result).toBe(1); + }); + + it("should throw NoValuesError when source is empty", async () => { + const source = AsyncObservable.from([]); + await expect(source.pipe(first())).rejects.toThrow(NoValuesError); + }); + + it("should work with await syntax", async () => { + const source = AsyncObservable.from(["a", "b", "c"]); + const result = await source.pipe(first()); + expect(result).toBe("a"); + }); + + it("should cancel the source after emitting first value", async () => { + const cancelSpy = vi.fn(); + const nextSpy = vi.fn(); + + const source = new AsyncObservable(async function* () { + try { + yield 1; + nextSpy(); + yield 2; + yield 3; + } finally { + cancelSpy(); + } + }); + + await source.pipe(first()); + expect(cancelSpy).toHaveBeenCalledTimes(1); + expect(nextSpy).not.toHaveBeenCalled(); + }); + }); + + describe("when using a function predicate", () => { + it("should emit the first value that satisfies the predicate", async () => { + const source = AsyncObservable.from([1, 2, 3, 4, 5]); + const result = await source.pipe(first((x) => x > 3)); + expect(result).toBe(4); + }); + + it("should pass the correct index to the predicate", async () => { + const source = AsyncObservable.from(["a", "b", "c"]); + const indexSpy = vi.fn().mockReturnValue(false); + + try { + await source.pipe(first(indexSpy)); + } catch (e) { + // Expected error + } + + expect(indexSpy).toHaveBeenNthCalledWith(1, "a", 0); + expect(indexSpy).toHaveBeenNthCalledWith(2, "b", 1); + expect(indexSpy).toHaveBeenNthCalledWith(3, "c", 2); + }); + + it("should increment index for each value", async () => { + const source = AsyncObservable.from([10, 20, 30]); + const indices: number[] = []; + + try { + await source.pipe( + first((_, index) => { + indices.push(index); + return false; + }) + ); + } catch (e) { + // Expected error + } + + expect(indices).toEqual([0, 1, 2]); + }); + + it("should start index at 0", async () => { + const source = AsyncObservable.from([5, 10, 15]); + const indexSpy = vi.fn().mockReturnValue(false); + + try { + await source.pipe(first(indexSpy)); + } catch (e) { + // Expected error + } + + expect(indexSpy).toHaveBeenCalledWith(5, 0); + }); + + it("should cancel the source after finding a match", async () => { + const cancelSpy = vi.fn(); + const nextSpy = vi.fn(); + + const source = new AsyncObservable(async function* () { + try { + yield 1; + yield 2; + yield 3; + nextSpy(); + yield 4; + yield 5; + } finally { + cancelSpy(); + } + }); + + await source.pipe(first((x) => x === 3)); + expect(cancelSpy).toHaveBeenCalledTimes(1); + expect(nextSpy).not.toHaveBeenCalled(); + }); + + it("should work with await syntax", async () => { + const source = AsyncObservable.from([1, 2, 3, 4, 5]); + const result = await source.pipe(first((x) => x > 2)); + expect(result).toBe(3); + }); + }); + + describe("when using BooleanConstructor", () => { + it("should emit the first truthy value", async () => { + const source = AsyncObservable.from([0, "", false, 42, null]); + const result = await source.pipe(first(Boolean)); + expect(result).toBe(42); + }); + + describe("should handle various truthy/falsy values", () => { + it("should handle numbers (0, 1, -1)", async () => { + const source = AsyncObservable.from([0, 1, -1]); + const result = await source.pipe(first(Boolean)); + expect(result).toBe(1); // 1 is the first truthy value + }); + + it('should handle strings ("", "hello")', async () => { + const source = AsyncObservable.from(["", "hello"]); + const result = await source.pipe(first(Boolean)); + expect(result).toBe("hello"); // "hello" is the first truthy value + }); + + it("should handle objects (null, {}, [])", async () => { + const source = AsyncObservable.from([null, {}, []]); + const result = await source.pipe(first(Boolean)); + expect(result).toEqual({}); // {} is the first truthy value + }); + + it("should handle booleans (false, true)", async () => { + const source = AsyncObservable.from([false, true]); + const result = await source.pipe(first(Boolean)); + expect(result).toBe(true); // true is the first truthy value + }); + }); + }); + + describe("when using type predicate", () => { + it("should narrow the type when using type predicate", async () => { + type Item = string | number; + const source = AsyncObservable.from([1, "a", 2, "b"]); + + const result = await source.pipe(first((x): x is string => typeof x === "string")); + + expect(typeof result).toBe("string"); + expect(result).toBe("a"); + }); + + it("should maintain original type when using boolean predicate", async () => { + const source = AsyncObservable.from([1, 2, 3, 4]); + const result = await source.pipe(first((x) => x > 2)); + + expect(typeof result).toBe("number"); + expect(result).toBe(3); + }); + }); + + describe("when default value is provided", () => { + it("should emit default value when no value satisfies predicate", async () => { + const source = AsyncObservable.from([1, 2, 3]); + const result = await source.pipe(first((x) => x > 10, 999)); + expect(result).toBe(999); + }); + + it("should emit default value when source is empty", async () => { + const source = AsyncObservable.from([]); + const result = await source.pipe(first(null, "default")); + expect(result).toBe("default"); + }); + + it("should not throw NoValuesError when default value is provided", async () => { + const source = AsyncObservable.from([]); + + // Should not throw + const result = await source.pipe(first(null, "default")); + expect(result).toBe("default"); + }); + + it("should work with different types for default value", async () => { + const source = AsyncObservable.from([1, 2, 3]); + const result = await source.pipe(first((x) => x > 10, "not found")); + expect(result).toBe("not found"); + }); + }); + + describe("when no default value is provided", () => { + it("should throw NoValuesError when no value satisfies predicate", async () => { + const source = AsyncObservable.from([1, 2, 3]); + + await expect(async () => { + await source.pipe(first((x) => x > 10)); + }).rejects.toThrow(NoValuesError); + }); + + it("should throw NoValuesError when source is empty", async () => { + const source = AsyncObservable.from([]); + + await expect(source.pipe(first())).rejects.toThrow(NoValuesError); + }); + }); + + describe("when predicate throws an error", () => { + it("should propagate the error to the subscriber", async () => { + const error = new Error("predicate error"); + const source = AsyncObservable.from([1, 2, 3]); + + let caughtError: Error | null = null; + try { + await source.pipe( + first(() => { + throw error; + }) + ); + } catch (e) { + caughtError = e as Error; + } + + expect(caughtError).toBe(error); + }); + + it("should cancel the source observable", async () => { + const error = new Error("predicate error"); + const cancelSpy = vi.fn(); + + const source = new AsyncObservable(async function* () { + try { + yield 1; + yield 2; + yield 3; + } finally { + cancelSpy(); + } + }); + + try { + await source.pipe( + first(() => { + throw error; + }) + ); + } catch (e) { + // Expected error + } + + expect(cancelSpy).toHaveBeenCalledTimes(1); + }); + + it("should not emit any value", async () => { + const error = new Error("predicate error"); + const source = AsyncObservable.from([1, 2, 3]); + + const nextSpy = vi.fn(); + try { + await source + .pipe( + first(() => { + throw error; + }) + ) + .subscribe(nextSpy); + } catch (e) { + // Expected error + } + + expect(nextSpy).not.toHaveBeenCalled(); + }); + }); + + describe("when source observable errors", () => { + it("should propagate the error to the subscriber", async () => { + const error = new Error("source error"); + const source = new AsyncObservable(async function* () { + yield 1; + throw error; + }); + + let caughtError: Error | null = null; + try { + await source.pipe(first((x) => x > 1)); + } catch (e) { + caughtError = e as Error; + } + + expect(caughtError).toBe(error); + }); + + it("should not emit any value", async () => { + const error = new Error("source error"); + const source = new AsyncObservable(async function* () { + yield 1; + throw error; + }); + + const nextSpy = vi.fn(); + try { + await source.pipe(first((x) => x > 1)).subscribe(nextSpy); + } catch (e) { + // Expected error + } + + expect(nextSpy).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/packages/eventkit/__tests__/operators/isEmpty.spec.ts b/packages/eventkit/__tests__/operators/isEmpty.spec.ts new file mode 100644 index 0000000..9d82c1e --- /dev/null +++ b/packages/eventkit/__tests__/operators/isEmpty.spec.ts @@ -0,0 +1,142 @@ +import { describe, expect, it, vi } from "vitest"; +import { SingletonAsyncObservable } from "../../lib/singleton"; +import { AsyncObservable } from "@eventkit/async-observable"; +import { isEmpty } from "../../lib/operators/isEmpty"; + +describe("isEmpty", () => { + it("should return a SingletonAsyncObservable", async () => { + const obs = AsyncObservable.from([1, 2, 3]); + const result = obs.pipe(isEmpty()); + expect(result).toBeInstanceOf(SingletonAsyncObservable); + }); + + describe("when source emits values", () => { + it("should emit false immediately when first value is emitted", async () => { + const source = AsyncObservable.from([1, 2, 3]); + const result = await source.pipe(isEmpty()); + expect(result).toBe(false); + }); + + it("should cancel the source after emitting false", async () => { + const cancelSpy = vi.fn(); + const nextSpy = vi.fn(); + + const source = new AsyncObservable(async function* () { + try { + yield 1; + nextSpy(); + yield 2; + yield 3; + } finally { + cancelSpy(); + } + }); + + await source.pipe(isEmpty()); + expect(cancelSpy).toHaveBeenCalledTimes(1); + expect(nextSpy).not.toHaveBeenCalled(); + }); + + it("should work with await syntax", async () => { + const source = AsyncObservable.from(["a", "b", "c"]); + const result = await source.pipe(isEmpty()); + expect(result).toBe(false); + }); + + it("should handle any type of value", async () => { + const numSource = AsyncObservable.from([1]); + const strSource = AsyncObservable.from(["string"]); + const objSource = AsyncObservable.from([{}]); + const boolSource = AsyncObservable.from([true]); + + expect(await numSource.pipe(isEmpty())).toBe(false); + expect(await strSource.pipe(isEmpty())).toBe(false); + expect(await objSource.pipe(isEmpty())).toBe(false); + expect(await boolSource.pipe(isEmpty())).toBe(false); + }); + }); + + describe("when source is empty", () => { + it("should emit true when source completes without values", async () => { + const source = AsyncObservable.from([]); + const result = await source.pipe(isEmpty()); + expect(result).toBe(true); + }); + + it("should work with await syntax", async () => { + const source = AsyncObservable.from([]); + const result = await source.pipe(isEmpty()); + expect(result).toBe(true); + }); + + it("should handle empty observables", async () => { + const source = new AsyncObservable(async function* () { + // Empty generator that just completes + }); + + const result = await source.pipe(isEmpty()); + expect(result).toBe(true); + }); + }); + + describe("when source errors", () => { + it("should propagate the error to the subscriber", async () => { + const error = new Error("source error"); + const source = new AsyncObservable(async function* () { + throw error; + }); + + let caughtError: Error | null = null; + try { + await source.pipe(isEmpty()); + } catch (e) { + caughtError = e as Error; + } + + expect(caughtError).toBe(error); + }); + + it("should not emit any value", async () => { + const error = new Error("source error"); + const source = new AsyncObservable(async function* () { + throw error; + }); + + const nextSpy = vi.fn(); + try { + await source.pipe(isEmpty()).subscribe(nextSpy); + } catch (e) { + // Expected error + } + + expect(nextSpy).not.toHaveBeenCalled(); + }); + }); + + describe("performance characteristics", () => { + it("should not process values after emitting false", async () => { + const processSpy = vi.fn(); + const source = new AsyncObservable(async function* () { + yield 1; + processSpy(); + yield 2; + processSpy(); + yield 3; + }); + + await source.pipe(isEmpty()); + expect(processSpy).not.toHaveBeenCalled(); + }); + + it("should complete quickly when source is empty", async () => { + const source = AsyncObservable.from([]); + + const startTime = performance.now(); + await source.pipe(isEmpty()); + const endTime = performance.now(); + + // Should complete very quickly (under 50ms) + expect(endTime - startTime).toBeLessThan(50); + }); + }); +}); diff --git a/packages/eventkit/__tests__/operators/last.spec.ts b/packages/eventkit/__tests__/operators/last.spec.ts new file mode 100644 index 0000000..8d5e8ac --- /dev/null +++ b/packages/eventkit/__tests__/operators/last.spec.ts @@ -0,0 +1,318 @@ +import { AsyncObservable } from "@eventkit/async-observable"; +import { last } from "../../lib/operators/last"; +import { vi, describe, it, expect } from "vitest"; +import { SingletonAsyncObservable } from "../../lib/singleton"; +import { NoValuesError } from "../../lib/utils/errors"; + +describe("last", () => { + it("should return a SingletonAsyncObservable", async () => { + const obs = AsyncObservable.from([1, 2, 3]); + const result = obs.pipe(last()); + expect(result).toBeInstanceOf(SingletonAsyncObservable); + }); + + describe("when no predicate is provided", () => { + it("should emit the last value from the source", async () => { + const source = AsyncObservable.from([1, 2, 3]); + const result = await source.pipe(last()); + expect(result).toBe(3); + }); + + it("should throw NoValuesError when source is empty", async () => { + const source = AsyncObservable.from([]); + + await expect(source.pipe(last())).rejects.toThrow(NoValuesError); + }); + + it("should work with await syntax", async () => { + const source = AsyncObservable.from(["a", "b", "c"]); + const result = await source.pipe(last()); + expect(result).toBe("c"); + }); + + it("should process all values before emitting", async () => { + const processSpy = vi.fn(); + + const source = new AsyncObservable(async function* () { + yield 1; + processSpy(1); + yield 2; + processSpy(2); + yield 3; + processSpy(3); + }); + + await source.pipe(last()); + expect(processSpy).toHaveBeenCalledTimes(3); + expect(processSpy).toHaveBeenNthCalledWith(1, 1); + expect(processSpy).toHaveBeenNthCalledWith(2, 2); + expect(processSpy).toHaveBeenNthCalledWith(3, 3); + }); + }); + + describe("when using a function predicate", () => { + it("should emit the last value that satisfies the predicate", async () => { + const source = AsyncObservable.from([1, 2, 3, 4, 5]); + const result = await source.pipe(last((x) => x % 2 === 0)); + expect(result).toBe(4); + }); + + it("should pass the correct index to the predicate", async () => { + const source = AsyncObservable.from(["a", "b", "c"]); + const indexSpy = vi.fn().mockReturnValue(true); + + await source.pipe(last(indexSpy)); + + expect(indexSpy).toHaveBeenNthCalledWith(1, "a", 0); + expect(indexSpy).toHaveBeenNthCalledWith(2, "b", 1); + expect(indexSpy).toHaveBeenNthCalledWith(3, "c", 2); + }); + + it("should increment index for each value", async () => { + const source = AsyncObservable.from([10, 20, 30]); + const indices: number[] = []; + + await source.pipe( + last((_, index) => { + indices.push(index); + return true; + }) + ); + + expect(indices).toEqual([0, 1, 2]); + }); + + it("should start index at 0", async () => { + const source = AsyncObservable.from([5, 10, 15]); + const indexSpy = vi.fn().mockReturnValue(true); + + await source.pipe(last(indexSpy)); + + expect(indexSpy).toHaveBeenCalledWith(5, 0); + }); + + it("should process all values before emitting", async () => { + const processSpy = vi.fn(); + const matchingValueSpy = vi.fn(); + + const source = new AsyncObservable(async function* () { + yield 2; // Matches + matchingValueSpy(2); + processSpy(1); + yield 3; // Doesn't match + processSpy(2); + yield 4; // Matches - should be the result + matchingValueSpy(4); + processSpy(3); + }); + + const result = await source.pipe(last((x) => x % 2 === 0)); + expect(result).toBe(4); + expect(processSpy).toHaveBeenCalledTimes(3); + expect(matchingValueSpy).toHaveBeenCalledTimes(2); + }); + + it("should work with await syntax", async () => { + const source = AsyncObservable.from([1, 2, 3, 4, 5]); + const result = await source.pipe(last((x) => x < 4)); + expect(result).toBe(3); + }); + }); + + describe("when using BooleanConstructor", () => { + it("should emit the last truthy value", async () => { + const source = AsyncObservable.from([42, 0, "", false, "last"]); + const result = await source.pipe(last(Boolean)); + expect(result).toBe("last"); + }); + + describe("should handle various truthy/falsy values", () => { + it("should handle numbers (0, 1, -1)", async () => { + const source = AsyncObservable.from([1, 0, 2]); + const result = await source.pipe(last(Boolean)); + expect(result).toBe(2); // 2 is the last truthy value + }); + + it('should handle strings ("", "hello")', async () => { + const source = AsyncObservable.from(["hello", "", "world"]); + const result = await source.pipe(last(Boolean)); + expect(result).toBe("world"); // "world" is the last truthy value + }); + + it("should handle objects (null, {}, [])", async () => { + const source = AsyncObservable.from([{}, null, []]); + const result = await source.pipe(last(Boolean)); + expect(result).toEqual([]); // [] is the last truthy value + }); + + it("should handle booleans (false, true)", async () => { + const source = AsyncObservable.from([true, false, true]); + const result = await source.pipe(last(Boolean)); + expect(result).toBe(true); // true is the last truthy value + }); + }); + }); + + describe("when using type predicate", () => { + it("should narrow the type when using type predicate", async () => { + type Item = string | number; + const source = AsyncObservable.from([1, "a", 2, "b"]); + + const result = await source.pipe(last((x): x is string => typeof x === "string")); + + expect(typeof result).toBe("string"); + expect(result).toBe("b"); + }); + + it("should maintain original type when using boolean predicate", async () => { + const source = AsyncObservable.from([1, 2, 3, 4]); + const result = await source.pipe(last((x) => x > 2)); + + expect(typeof result).toBe("number"); + expect(result).toBe(4); + }); + }); + + describe("when default value is provided", () => { + it("should emit default value when no value satisfies predicate", async () => { + const source = AsyncObservable.from([1, 2, 3]); + const result = await source.pipe(last((x) => x > 10, 999)); + expect(result).toBe(999); + }); + + it("should emit default value when source is empty", async () => { + const source = AsyncObservable.from([]); + const result = await source.pipe(last(null, "default")); + expect(result).toBe("default"); + }); + + it("should not throw NoValuesError when default value is provided", async () => { + const source = AsyncObservable.from([]); + + // Should not throw + const result = await source.pipe(last(null, "default")); + expect(result).toBe("default"); + }); + + it("should work with different types for default value", async () => { + const source = AsyncObservable.from([1, 2, 3]); + const result = await source.pipe(last((x) => x > 10, "not found")); + expect(result).toBe("not found"); + }); + }); + + describe("when no default value is provided", () => { + it("should throw NoValuesError when no value satisfies predicate", async () => { + const source = AsyncObservable.from([1, 2, 3]); + await expect(source.pipe(last((x) => x > 10))).rejects.toThrow(NoValuesError); + }); + + it("should throw NoValuesError when source is empty", async () => { + const source = AsyncObservable.from([]); + await expect(source.pipe(last())).rejects.toThrow(NoValuesError); + }); + }); + + describe("when predicate throws an error", () => { + it("should propagate the error to the subscriber", async () => { + const error = new Error("predicate error"); + const source = AsyncObservable.from([1, 2, 3]); + + let caughtError: Error | null = null; + try { + await source.pipe( + last(() => { + throw error; + }) + ); + } catch (e) { + caughtError = e as Error; + } + + expect(caughtError).toBe(error); + }); + + it("should cancel the source observable", async () => { + const error = new Error("predicate error"); + const cancelSpy = vi.fn(); + + const source = new AsyncObservable(async function* () { + try { + yield 1; + yield 2; + yield 3; + } finally { + cancelSpy(); + } + }); + + try { + await source.pipe( + last(() => { + throw error; + }) + ); + } catch (e) { + // Expected error + } + + expect(cancelSpy).toHaveBeenCalledTimes(1); + }); + + it("should not emit any value", async () => { + const error = new Error("predicate error"); + const source = AsyncObservable.from([1, 2, 3]); + + const nextSpy = vi.fn(); + try { + await source + .pipe( + last(() => { + throw error; + }) + ) + .subscribe(nextSpy); + } catch (e) { + // Expected error + } + + expect(nextSpy).not.toHaveBeenCalled(); + }); + }); + + describe("when source observable errors", () => { + it("should propagate the error to the subscriber", async () => { + const error = new Error("source error"); + const source = new AsyncObservable(async function* () { + yield 1; + throw error; + }); + + let caughtError: Error | null = null; + try { + await source.pipe(last()); + } catch (e) { + caughtError = e as Error; + } + + expect(caughtError).toBe(error); + }); + + it("should not emit any value", async () => { + const error = new Error("source error"); + const source = new AsyncObservable(async function* () { + yield 1; + throw error; + }); + + const nextSpy = vi.fn(); + try { + await source.pipe(last()).subscribe(nextSpy); + } catch (e) { + // Expected error + } + + expect(nextSpy).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/packages/eventkit/__tests__/operators/max.spec.ts b/packages/eventkit/__tests__/operators/max.spec.ts new file mode 100644 index 0000000..675ed4d --- /dev/null +++ b/packages/eventkit/__tests__/operators/max.spec.ts @@ -0,0 +1,216 @@ +import { describe, expect, it, vi } from "vitest"; +import { SingletonAsyncObservable } from "../../lib/singleton"; +import { AsyncObservable } from "@eventkit/async-observable"; +import { max } from "../../lib/operators/max"; +import { NoValuesError } from "../../lib/utils/errors"; + +describe("max", () => { + it("should return a SingletonOperatorFunction", async () => { + const obs = AsyncObservable.from([1, 2, 3]); + const result = obs.pipe(max()); + expect(result).toBeInstanceOf(SingletonAsyncObservable); + }); + + describe("when using default comparer", () => { + it("should emit the maximum value from the source", async () => { + const source = AsyncObservable.from([1, 5, 3, 9, 2]); + const result = await source.pipe(max()); + expect(result).toBe(9); + }); + + describe("should handle numbers", () => { + it("should handle positive numbers", async () => { + const source = AsyncObservable.from([3, 1, 7, 5, 2]); + const result = await source.pipe(max()); + expect(result).toBe(7); + }); + + it("should handle negative numbers", async () => { + const source = AsyncObservable.from([-5, -2, -10, -1, -7]); + const result = await source.pipe(max()); + expect(result).toBe(-1); + }); + + it("should handle zero", async () => { + const source = AsyncObservable.from([-3, 0, -5, -2]); + const result = await source.pipe(max()); + expect(result).toBe(0); + }); + + it("should handle mixed positive and negative", async () => { + const source = AsyncObservable.from([-10, 5, -3, 8, 0, -7]); + const result = await source.pipe(max()); + expect(result).toBe(8); + }); + }); + + it("should handle strings (lexicographical comparison)", async () => { + const source = AsyncObservable.from(["apple", "banana", "orange", "cherry"]); + const result = await source.pipe(max()); + expect(result).toBe("orange"); + }); + + it("should handle dates", async () => { + const date1 = new Date(2020, 1, 1); + const date2 = new Date(2021, 1, 1); + const date3 = new Date(2019, 1, 1); + + const source = AsyncObservable.from([date1, date2, date3]); + const result = await source.pipe(max()); + expect(result).toBe(date2); + }); + + it("should throw NoValuesError when source is empty", async () => { + const source = AsyncObservable.from([]); + + await expect(async () => { + await source.pipe(max()); + }).rejects.toThrow(NoValuesError); + }); + + it("should work with await syntax", async () => { + const source = AsyncObservable.from([3, 1, 7, 5, 2]); + const result = await source.pipe(max()); + expect(result).toBe(7); + }); + }); + + describe("when using custom comparer", () => { + it("should use the provided comparer function", async () => { + const source = AsyncObservable.from([5, 1, 7, 3]); + const comparerSpy = vi.fn((a, b) => b - a); // Reverse order + + const result = await source.pipe(max(comparerSpy)); + + expect(comparerSpy).toHaveBeenCalled(); + expect(result).toBe(1); // The "maximum" when using reverse comparer is the smallest value + }); + + describe("should handle complex objects", () => { + it("should compare based on specific properties", async () => { + const users = [ + { name: "Alice", age: 25 }, + { name: "Bob", age: 40 }, + { name: "Charlie", age: 30 }, + ]; + + const source = AsyncObservable.from(users); + const result = await source.pipe(max((a, b) => a.age - b.age)); + + expect(result).toEqual({ name: "Bob", age: 40 }); + }); + + it("should handle nested objects", async () => { + const items = [ + { id: 1, data: { value: 5 } }, + { id: 2, data: { value: 10 } }, + { id: 3, data: { value: 3 } }, + ]; + + const source = AsyncObservable.from(items); + const result = await source.pipe(max((a, b) => a.data.value - b.data.value)); + + expect(result).toEqual({ id: 2, data: { value: 10 } }); + }); + }); + + it("should handle edge cases in comparison", async () => { + // Test with a comparer that considers negative values as "higher" + const source = AsyncObservable.from([5, -10, 3, -20, 1]); + const result = await source.pipe( + max((a, b) => (a < 0 && b >= 0 ? 1 : b < 0 && a >= 0 ? -1 : a - b)) + ); + + expect(result).toBe(-10); + }); + + it("should throw NoValuesError when source is empty", async () => { + const source = AsyncObservable.from([]); + const customComparer = (a: number, b: number) => a - b; + + await expect(source.pipe(max(customComparer))).rejects.toThrow(NoValuesError); + }); + + it("should work with await syntax", async () => { + const source = AsyncObservable.from([3, 1, 7, 5, 2]); + const result = await source.pipe(max((a, b) => a - b)); + expect(result).toBe(7); + }); + }); + + describe("when source observable errors", () => { + it("should propagate the error to the subscriber", async () => { + const error = new Error("source error"); + const source = new AsyncObservable(async function* () { + yield 1; + throw error; + }); + + let caughtError: Error | null = null; + try { + await source.pipe(max()); + } catch (e) { + caughtError = e as Error; + } + + expect(caughtError).toBe(error); + }); + + it("should not emit any value", async () => { + const error = new Error("source error"); + const source = new AsyncObservable(async function* () { + yield 1; + throw error; + }); + + const nextSpy = vi.fn(); + try { + await source.pipe(max()).subscribe(nextSpy); + } catch (e) { + // Expected error + } + + expect(nextSpy).not.toHaveBeenCalled(); + }); + }); + + describe("when comparer throws an error", () => { + it("should propagate the error to the subscriber", async () => { + const error = new Error("comparer error"); + const source = AsyncObservable.from([1, 2, 3]); + + let caughtError: Error | null = null; + try { + await source.pipe( + max(() => { + throw error; + }) + ); + } catch (e) { + caughtError = e as Error; + } + + expect(caughtError).toBe(error); + }); + + it("should not emit any value", async () => { + const error = new Error("comparer error"); + const source = AsyncObservable.from([1, 2, 3]); + + const nextSpy = vi.fn(); + try { + await source + .pipe( + max(() => { + throw error; + }) + ) + .subscribe(nextSpy); + } catch (e) { + // Expected error + } + + expect(nextSpy).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/packages/eventkit/__tests__/operators/min.spec.ts b/packages/eventkit/__tests__/operators/min.spec.ts new file mode 100644 index 0000000..fae5b00 --- /dev/null +++ b/packages/eventkit/__tests__/operators/min.spec.ts @@ -0,0 +1,250 @@ +import { describe, expect, it, vi } from "vitest"; +import { SingletonAsyncObservable } from "../../lib/singleton"; +import { AsyncObservable } from "@eventkit/async-observable"; +import { min } from "../../lib/operators/min"; +import { NoValuesError } from "../../lib/utils/errors"; + +describe("min", () => { + it("should return a SingletonOperatorFunction", async () => { + const obs = AsyncObservable.from([1, 2, 3]); + const result = obs.pipe(min()); + expect(result).toBeInstanceOf(SingletonAsyncObservable); + }); + + describe("when using default comparer", () => { + it("should emit the minimum value from the source", async () => { + const source = AsyncObservable.from([5, 1, 9, 3, 2]); + const result = await source.pipe(min()); + expect(result).toBe(1); + }); + + describe("should handle numbers", () => { + it("should handle positive numbers", async () => { + const source = AsyncObservable.from([3, 1, 7, 5, 2]); + const result = await source.pipe(min()); + expect(result).toBe(1); + }); + + it("should handle negative numbers", async () => { + const source = AsyncObservable.from([-5, -2, -10, -1, -7]); + const result = await source.pipe(min()); + expect(result).toBe(-10); + }); + + it("should handle zero", async () => { + const source = AsyncObservable.from([3, 0, 5, 2]); + const result = await source.pipe(min()); + expect(result).toBe(0); + }); + + it("should handle mixed positive and negative", async () => { + const source = AsyncObservable.from([10, -5, 3, -8, 0, 7]); + const result = await source.pipe(min()); + expect(result).toBe(-8); + }); + }); + + it("should handle strings (lexicographical comparison)", async () => { + const source = AsyncObservable.from(["apple", "banana", "orange", "cherry"]); + const result = await source.pipe(min()); + expect(result).toBe("apple"); + }); + + it("should handle dates", async () => { + const date1 = new Date(2020, 1, 1); + const date2 = new Date(2021, 1, 1); + const date3 = new Date(2019, 1, 1); + + const source = AsyncObservable.from([date1, date2, date3]); + const result = await source.pipe(min()); + expect(result).toBe(date3); + }); + + it("should throw NoValuesError when source is empty", async () => { + const source = AsyncObservable.from([]); + + await expect(source.pipe(min())).rejects.toThrow(NoValuesError); + }); + + it("should work with await syntax", async () => { + const source = AsyncObservable.from([3, 1, 7, 5, 2]); + const result = await source.pipe(min()); + expect(result).toBe(1); + }); + }); + + describe("when using custom comparer", () => { + it("should use the provided comparer function", async () => { + const source = AsyncObservable.from([5, 1, 7, 3]); + const comparerSpy = vi.fn((a, b) => b - a); // Reverse order + + const result = await source.pipe(min(comparerSpy)); + + expect(comparerSpy).toHaveBeenCalled(); + expect(result).toBe(7); // The "minimum" when using reverse comparer is the largest value + }); + + describe("should handle complex objects", () => { + it("should compare based on specific properties", async () => { + const users = [ + { name: "Alice", age: 25 }, + { name: "Bob", age: 40 }, + { name: "Charlie", age: 30 }, + ]; + + const source = AsyncObservable.from(users); + const result = await source.pipe(min((a, b) => a.age - b.age)); + + expect(result).toEqual({ name: "Alice", age: 25 }); + }); + + it("should handle nested objects", async () => { + const items = [ + { id: 1, data: { value: 5 } }, + { id: 2, data: { value: 10 } }, + { id: 3, data: { value: 3 } }, + ]; + + const source = AsyncObservable.from(items); + const result = await source.pipe(min((a, b) => a.data.value - b.data.value)); + + expect(result).toEqual({ id: 3, data: { value: 3 } }); + }); + }); + + it("should handle edge cases in comparison", async () => { + // Test with a comparer that considers negative values as "lower" + const source = AsyncObservable.from([5, -10, 3, -20, 1]); + const result = await source.pipe( + min((a, b) => (a < 0 && b >= 0 ? -1 : b < 0 && a >= 0 ? 1 : a - b)) + ); + + expect(result).toBe(-20); + }); + + it("should throw NoValuesError when source is empty", async () => { + const source = AsyncObservable.from([]); + const customComparer = (a: number, b: number) => a - b; + + await expect(source.pipe(min(customComparer))).rejects.toThrow(NoValuesError); + }); + + it("should work with await syntax", async () => { + const source = AsyncObservable.from([3, 1, 7, 5, 2]); + const result = await source.pipe(min((a, b) => a - b)); + expect(result).toBe(1); + }); + }); + + describe("when source observable errors", () => { + it("should propagate the error to the subscriber", async () => { + const error = new Error("source error"); + const source = new AsyncObservable(async function* () { + yield 1; + throw error; + }); + + let caughtError: Error | null = null; + try { + await source.pipe(min()); + } catch (e) { + caughtError = e as Error; + } + + expect(caughtError).toBe(error); + }); + + it("should not emit any value", async () => { + const error = new Error("source error"); + const source = new AsyncObservable(async function* () { + yield 1; + throw error; + }); + + const nextSpy = vi.fn(); + try { + await source.pipe(min()).subscribe(nextSpy); + } catch (e) { + // Expected error + } + + expect(nextSpy).not.toHaveBeenCalled(); + }); + }); + + describe("when comparer throws an error", () => { + it("should propagate the error to the subscriber", async () => { + const error = new Error("comparer error"); + const source = AsyncObservable.from([1, 2, 3]); + + let caughtError: Error | null = null; + try { + await source.pipe( + min(() => { + throw error; + }) + ); + } catch (e) { + caughtError = e as Error; + } + + expect(caughtError).toBe(error); + }); + + it("should not emit any value", async () => { + const error = new Error("comparer error"); + const source = AsyncObservable.from([1, 2, 3]); + + const nextSpy = vi.fn(); + try { + await source + .pipe( + min(() => { + throw error; + }) + ) + .subscribe(nextSpy); + } catch (e) { + // Expected error + } + + expect(nextSpy).not.toHaveBeenCalled(); + }); + }); + + describe("performance characteristics", () => { + it("should process all values before emitting", async () => { + const processSpy = vi.fn(); + + const source = new AsyncObservable(async function* () { + yield 5; + processSpy(1); + yield 1; + processSpy(2); + yield 3; + processSpy(3); + }); + + await source.pipe(min()); + expect(processSpy).toHaveBeenCalledTimes(3); + }); + + it("should not cancel source early", async () => { + const cancelSpy = vi.fn(); + + const source = new AsyncObservable(async function* () { + try { + yield 5; + yield 1; + yield 3; + } finally { + cancelSpy(); + } + }); + + await source.pipe(min()); + expect(cancelSpy).toHaveBeenCalledTimes(1); + // This ensures the source completed normally and wasn't cancelled early + }); + }); +}); diff --git a/packages/eventkit/__tests__/operators/pairwise.spec.ts b/packages/eventkit/__tests__/operators/pairwise.spec.ts new file mode 100644 index 0000000..19e01d0 --- /dev/null +++ b/packages/eventkit/__tests__/operators/pairwise.spec.ts @@ -0,0 +1,258 @@ +import { describe, expect, it, vi } from "vitest"; +import { AsyncObservable } from "@eventkit/async-observable"; +import { pairwise } from "../../lib/operators/pairwise"; + +const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); + +describe("pairwise", () => { + describe("when source emits values", () => { + it("should emit pairs of consecutive values", async () => { + const source = AsyncObservable.from([1, 2, 3, 4]); + const results: [number, number][] = []; + + await source.pipe(pairwise()).subscribe((pair) => { + results.push(pair); + }); + + expect(results).toEqual([ + [1, 2], + [2, 3], + [3, 4], + ]); + }); + + it("should skip the first value (no previous value)", async () => { + const source = AsyncObservable.from(["a", "b", "c"]); + const results: [string, string][] = []; + + await source.pipe(pairwise()).subscribe((pair) => { + results.push(pair); + }); + + // First value "a" is skipped, only pairs ["a", "b"] and ["b", "c"] are emitted + expect(results.length).toBe(2); + expect(results[0][0]).toBe("a"); + expect(results[0][1]).toBe("b"); + }); + + it("should handle any type of value", async () => { + const numSource = AsyncObservable.from([1, 2, 3]); + const strSource = AsyncObservable.from(["a", "b", "c"]); + const objSource = AsyncObservable.from([{ id: 1 }, { id: 2 }, { id: 3 }]); + + const numResults: [number, number][] = []; + const strResults: [string, string][] = []; + const objResults: [{ id: number }, { id: number }][] = []; + + await numSource.pipe(pairwise()).subscribe((pair) => numResults.push(pair)); + await strSource.pipe(pairwise()).subscribe((pair) => strResults.push(pair)); + await objSource.pipe(pairwise()).subscribe((pair) => objResults.push(pair)); + + expect(numResults).toEqual([ + [1, 2], + [2, 3], + ]); + expect(strResults).toEqual([ + ["a", "b"], + ["b", "c"], + ]); + expect(objResults[0][0]).toEqual({ id: 1 }); + expect(objResults[0][1]).toEqual({ id: 2 }); + expect(objResults[1][0]).toEqual({ id: 2 }); + expect(objResults[1][1]).toEqual({ id: 3 }); + }); + + it("should maintain value order in pairs", async () => { + const source = AsyncObservable.from([5, 10, 15, 20]); + const results: [number, number][] = []; + + await source.pipe(pairwise()).subscribe((pair) => { + results.push(pair); + }); + + expect(results[0][0]).toBe(5); // First in first pair is 5 + expect(results[0][1]).toBe(10); // Second in first pair is 10 + expect(results[1][0]).toBe(10); // First in second pair is 10 + expect(results[1][1]).toBe(15); // Second in second pair is 15 + }); + + it("should handle multiple emissions", async () => { + const source = new AsyncObservable(async function* () { + yield 1; + await delay(10); + yield 2; + await delay(10); + yield 3; + await delay(10); + yield 4; + }); + + const results: [number, number][] = []; + await source.pipe(pairwise()).subscribe((pair) => { + results.push(pair); + }); + + expect(results).toEqual([ + [1, 2], + [2, 3], + [3, 4], + ]); + }); + + it("should complete when source completes", async () => { + const source = AsyncObservable.from([1, 2, 3]); + const completionSpy = vi.fn(); + + const sub = source.pipe(pairwise()).subscribe(() => {}); + sub.finally(completionSpy); + + await sub; + expect(completionSpy).toHaveBeenCalledTimes(1); + }); + }); + + describe("when source is empty", () => { + it("should complete without emitting any values", async () => { + const source = AsyncObservable.from([]); + const nextSpy = vi.fn(); + const completionSpy = vi.fn(); + + const sub = source.pipe(pairwise()).subscribe(nextSpy); + sub.finally(completionSpy); + + await sub; + expect(nextSpy).not.toHaveBeenCalled(); + expect(completionSpy).toHaveBeenCalledTimes(1); + }); + + it("should not throw any errors", async () => { + const source = AsyncObservable.from([]); + + let error: Error | null = null; + try { + await source.pipe(pairwise()).subscribe(() => {}); + } catch (e) { + error = e as Error; + } + + expect(error).toBeNull(); + }); + }); + + describe("when source emits only one value", () => { + it("should complete without emitting any values", async () => { + const source = AsyncObservable.from([42]); + const nextSpy = vi.fn(); + const completionSpy = vi.fn(); + + const sub = source.pipe(pairwise()).subscribe(nextSpy); + sub.finally(completionSpy); + + await sub; + expect(nextSpy).not.toHaveBeenCalled(); + expect(completionSpy).toHaveBeenCalledTimes(1); + }); + + it("should not throw any errors", async () => { + const source = AsyncObservable.from([42]); + + let error: Error | null = null; + try { + await source.pipe(pairwise()).subscribe(() => {}); + } catch (e) { + error = e as Error; + } + + expect(error).toBeNull(); + }); + }); + + describe("when source errors", () => { + it("should propagate the error to the subscriber", async () => { + const error = new Error("source error"); + const source = new AsyncObservable(async function* () { + yield 1; + yield 2; + throw error; + }); + + let caughtError: Error | null = null; + try { + await source.pipe(pairwise()).subscribe(() => {}); + } catch (e) { + caughtError = e as Error; + } + + expect(caughtError).toBe(error); + }); + + it("should not emit any value after error", async () => { + const error = new Error("source error"); + const emittedValues: [number, number][] = []; + + const source = new AsyncObservable(async function* () { + yield 1; + yield 2; + yield 3; + throw error; + yield 4; // Should not be emitted + yield 5; // Should not be emitted + }); + + try { + await source.pipe(pairwise()).subscribe((pair) => { + emittedValues.push(pair); + }); + } catch (e) { + // Expected error + } + + expect(emittedValues).toEqual([ + [1, 2], + [2, 3], + ]); + }); + }); + + describe("performance characteristics", () => { + it("should not buffer values unnecessarily", async () => { + const source = new AsyncObservable(async function* () { + for (let i = 0; i < 1000; i++) { + yield i; + } + }); + + let emitCount = 0; + await source.pipe(pairwise()).subscribe(() => { + emitCount++; + }); + + // One less than source emissions + expect(emitCount).toBe(999); + }); + + it("should maintain memory efficiency", async () => { + const memorySpy = vi.fn(); + + const source = new AsyncObservable(async function* () { + let prev: number | null = null; + for (let i = 0; i < 100; i++) { + yield i; + // Check that only one previous value is stored + memorySpy(prev); + prev = i; + } + }); + + await source.pipe(pairwise()).subscribe(() => {}); + + // Memory spy should have been called with null initially + expect(memorySpy).toHaveBeenCalledWith(null); + + // And then with increasing values as they're processed + for (let i = 0; i < 99; i++) { + expect(memorySpy).toHaveBeenCalledWith(i); + } + }); + }); +}); diff --git a/packages/eventkit/__tests__/operators/reduce.spec.ts b/packages/eventkit/__tests__/operators/reduce.spec.ts index fec5e39..d4b7f45 100644 --- a/packages/eventkit/__tests__/operators/reduce.spec.ts +++ b/packages/eventkit/__tests__/operators/reduce.spec.ts @@ -1,6 +1,7 @@ import { AsyncObservable } from "@eventkit/async-observable"; import { reduce } from "../../lib/operators/reduce"; import { vi, describe, it, expect } from "vitest"; +import { NoValuesError } from "../../lib/utils/errors"; const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); @@ -62,18 +63,39 @@ describe("reduce", () => { const source = AsyncObservable.from([1, 2, 3]); expect(await source.pipe(reduce((acc, value) => acc + value, 5))).toEqual(11); }); + + it("should emit seed value when source emits no values", async () => { + const source = AsyncObservable.from([]); + const seed = 42; + + const result: number[] = []; + await source.pipe(reduce((acc, value) => acc + value, seed)).subscribe((value) => { + result.push(value); + }); + + expect(result).toEqual([42]); + }); + + it("should not call accumulator when source emits no values", async () => { + const source = AsyncObservable.from([]); + const seed = 42; + const accumulatorSpy = vi.fn((acc, value) => acc + value); + + expect(await source.pipe(reduce(accumulatorSpy, seed))).toEqual(seed); + expect(accumulatorSpy).not.toHaveBeenCalled(); + }); }); describe("when no seed value is provided", () => { it("should use first value as initial accumulator value", async () => { const source = AsyncObservable.from([1, 2, 3, 4]); - const accumulatorSpy = vi.fn((acc, value) => (acc || 0) + value); + const accumulatorSpy = vi.fn((acc, value) => acc + value); await source.pipe(reduce(accumulatorSpy)).subscribe(() => {}); - expect(accumulatorSpy).toHaveBeenCalledTimes(4); - expect(accumulatorSpy).toHaveBeenNthCalledWith(1, undefined, 1, 0); - expect(accumulatorSpy).toHaveBeenNthCalledWith(2, 1, 2, 1); + expect(accumulatorSpy).toHaveBeenCalledTimes(3); + expect(accumulatorSpy).toHaveBeenNthCalledWith(1, 1, 2, 1); + expect(accumulatorSpy).toHaveBeenNthCalledWith(2, 3, 3, 2); }); it("should handle undefined initial value", async () => { @@ -98,6 +120,22 @@ describe("reduce", () => { await source.pipe(reduce((acc, value) => (acc || "") + value)) ).toEqual("abc"); }); + + it("should throw NoValuesError when source emits no values", async () => { + const source = AsyncObservable.from([]); + + // Using subscribe + let capturedError: Error | null = null; + try { + await source.pipe(reduce((acc, value) => acc + value)); + } catch (err) { + capturedError = err as Error; + } + expect(capturedError).toBeInstanceOf(NoValuesError); + + // Using singleton object + await expect(source.pipe(reduce((acc, value) => acc + value))).rejects.toThrow(NoValuesError); + }); }); describe("when source emits multiple values", () => { @@ -133,10 +171,9 @@ describe("reduce", () => { ) .subscribe(() => {}); - expect(indexSpy).toHaveBeenCalledTimes(3); - expect(indexSpy).toHaveBeenNthCalledWith(1, 0); - expect(indexSpy).toHaveBeenNthCalledWith(2, 1); - expect(indexSpy).toHaveBeenNthCalledWith(3, 2); + expect(indexSpy).toHaveBeenCalledTimes(2); + expect(indexSpy).toHaveBeenNthCalledWith(1, 1); + expect(indexSpy).toHaveBeenNthCalledWith(2, 2); }); it("should emit final accumulated value using singleton object", async () => { @@ -145,6 +182,8 @@ describe("reduce", () => { await source.pipe(reduce((acc, value) => (acc || "") + value)) ).toEqual("abc"); }); + + it("should handle type conversion during accumulation", async () => {}); }); describe("when source emits no values", () => { @@ -160,13 +199,7 @@ describe("reduce", () => { expect(result).toEqual([seed]); }); - it("should not call predicate if no seed value", async () => { - const source = AsyncObservable.from([]); - const accumulatorSpy = vi.fn((acc, value) => (acc ?? 0) + value); - - await source.pipe(reduce(accumulatorSpy)).subscribe(() => {}); - expect(accumulatorSpy).not.toHaveBeenCalled(); - }); + it("should throw NoValuesError if no seed value", async () => {}); it("should emit seed value using singleton object", async () => { const source = AsyncObservable.from([]); @@ -300,10 +333,9 @@ describe("reduce", () => { await source.pipe(reduce(accumulatorSpy)).subscribe(() => {}); - expect(accumulatorSpy).toHaveBeenCalledTimes(3); - expect(accumulatorSpy).toHaveBeenNthCalledWith(1, undefined, 1, 0); - expect(accumulatorSpy).toHaveBeenNthCalledWith(2, undefined, 2, 1); - expect(accumulatorSpy).toHaveBeenNthCalledWith(3, undefined, 3, 2); + expect(accumulatorSpy).toHaveBeenCalledTimes(2); + expect(accumulatorSpy).toHaveBeenNthCalledWith(1, 1, 2, 1); + expect(accumulatorSpy).toHaveBeenNthCalledWith(2, undefined, 3, 2); }); it("should handle undefined as valid result using singleton object", async () => { @@ -311,4 +343,44 @@ describe("reduce", () => { expect(await source.pipe(reduce(() => undefined))).toEqual(undefined); }); }); + + describe("when source emits single value", () => { + it("should emit value without calling accumulator if no seed", async () => { + const source = AsyncObservable.from([42]); + const accumulatorSpy = vi.fn((acc, value) => acc + value); + + const result: number[] = []; + await source.pipe(reduce(accumulatorSpy)).subscribe((value) => { + result.push(value); + }); + + expect(result).toEqual([42]); + expect(accumulatorSpy).not.toHaveBeenCalled(); + }); + + it("should call accumulator with seed if provided", async () => { + const source = AsyncObservable.from([42]); + const seed = 10; + const accumulatorSpy = vi.fn((acc, value) => acc + value); + + const result: number[] = []; + await source.pipe(reduce(accumulatorSpy, seed)).subscribe((value) => { + result.push(value); + }); + + expect(result).toEqual([52]); // 10 + 42 = 52 + expect(accumulatorSpy).toHaveBeenCalledTimes(1); + expect(accumulatorSpy).toHaveBeenCalledWith(10, 42, 0); + }); + + it("should emit value using singleton object", async () => { + const source = AsyncObservable.from([42]); + + // Without seed + expect(await source.pipe(reduce((acc, value) => acc + value))).toEqual(42); + + // With seed + expect(await source.pipe(reduce((acc, value) => acc + value, 10))).toEqual(52); + }); + }); }); diff --git a/packages/eventkit/__tests__/operators/skip.spec.ts b/packages/eventkit/__tests__/operators/skip.spec.ts new file mode 100644 index 0000000..4efb7b9 --- /dev/null +++ b/packages/eventkit/__tests__/operators/skip.spec.ts @@ -0,0 +1,297 @@ +import { describe, expect, it, vi } from "vitest"; +import { AsyncObservable } from "@eventkit/async-observable"; +import { skip } from "../../lib/operators/skip"; + +describe("skip", () => { + describe("when count is 0", () => { + it("should emit all values from source", async () => { + const source = AsyncObservable.from([1, 2, 3, 4]); + const results: number[] = []; + + await source.pipe(skip(0)).subscribe((value) => { + results.push(value); + }); + + expect(results).toEqual([1, 2, 3, 4]); + }); + + it("should maintain value order", async () => { + const source = AsyncObservable.from(["a", "b", "c", "d"]); + const results: string[] = []; + + await source.pipe(skip(0)).subscribe((value) => { + results.push(value); + }); + + expect(results).toEqual(["a", "b", "c", "d"]); + }); + + it("should complete when source completes", async () => { + const source = AsyncObservable.from([1, 2, 3]); + const completionSpy = vi.fn(); + + const sub = source.pipe(skip(0)).subscribe(() => {}); + sub.finally(completionSpy); + + await sub; + expect(completionSpy).toHaveBeenCalledTimes(1); + }); + }); + + describe("when count is positive", () => { + it("should skip the first N values", async () => { + const source = AsyncObservable.from([1, 2, 3, 4, 5]); + const results: number[] = []; + + await source.pipe(skip(3)).subscribe((value) => { + results.push(value); + }); + + expect(results).not.toContain(1); + expect(results).not.toContain(2); + expect(results).not.toContain(3); + expect(results).toEqual([4, 5]); + }); + + it("should emit remaining values", async () => { + const source = AsyncObservable.from([1, 2, 3, 4, 5]); + const results: number[] = []; + + await source.pipe(skip(2)).subscribe((value) => { + results.push(value); + }); + + expect(results).toEqual([3, 4, 5]); + }); + + it("should maintain value order", async () => { + const source = AsyncObservable.from([10, 20, 30, 40, 50]); + const results: number[] = []; + + await source.pipe(skip(3)).subscribe((value) => { + results.push(value); + }); + + expect(results).toEqual([40, 50]); + }); + + it("should handle any type of value", async () => { + const numSource = AsyncObservable.from([1, 2, 3, 4, 5]); + const strSource = AsyncObservable.from(["a", "b", "c", "d"]); + const objSource = AsyncObservable.from([{ id: 1 }, { id: 2 }, { id: 3 }, { id: 4 }]); + + const numResults: number[] = []; + const strResults: string[] = []; + const objResults: { id: number }[] = []; + + await numSource.pipe(skip(2)).subscribe((value) => numResults.push(value)); + await strSource.pipe(skip(1)).subscribe((value) => strResults.push(value)); + await objSource.pipe(skip(3)).subscribe((value) => objResults.push(value)); + + expect(numResults).toEqual([3, 4, 5]); + expect(strResults).toEqual(["b", "c", "d"]); + expect(objResults).toEqual([{ id: 4 }]); + }); + + it("should complete when source completes", async () => { + const source = AsyncObservable.from([1, 2, 3, 4]); + const completionSpy = vi.fn(); + + const sub = source.pipe(skip(2)).subscribe(() => {}); + sub.finally(completionSpy); + + await sub; + expect(completionSpy).toHaveBeenCalledTimes(1); + }); + }); + + describe("when count equals source length", () => { + it("should complete without emitting any values", async () => { + const source = AsyncObservable.from([1, 2, 3]); + const nextSpy = vi.fn(); + const completionSpy = vi.fn(); + + const sub = source.pipe(skip(3)).subscribe(nextSpy); + sub.finally(completionSpy); + + await sub; + expect(nextSpy).not.toHaveBeenCalled(); + expect(completionSpy).toHaveBeenCalledTimes(1); + }); + + it("should not throw any errors", async () => { + const source = AsyncObservable.from([1, 2]); + + let error: Error | null = null; + try { + await source.pipe(skip(2)).subscribe(() => {}); + } catch (e) { + error = e as Error; + } + + expect(error).toBeNull(); + }); + }); + + describe("when count exceeds source length", () => { + it("should complete without emitting any values", async () => { + const source = AsyncObservable.from([1, 2, 3]); + const nextSpy = vi.fn(); + const completionSpy = vi.fn(); + + const sub = source.pipe(skip(5)).subscribe(nextSpy); + sub.finally(completionSpy); + + await sub; + expect(nextSpy).not.toHaveBeenCalled(); + expect(completionSpy).toHaveBeenCalledTimes(1); + }); + + it("should not throw any errors", async () => { + const source = AsyncObservable.from([1, 2, 3]); + + let error: Error | null = null; + try { + await source.pipe(skip(10)).subscribe(() => {}); + } catch (e) { + error = e as Error; + } + + expect(error).toBeNull(); + }); + }); + + describe("when count is negative", () => { + it("should emit all values", async () => { + // Since skip is implemented using filter(_, index) => count <= index, + // a negative count will always return true and emit all values + const source = AsyncObservable.from([1, 2, 3, 4]); + const results: number[] = []; + + await source.pipe(skip(-2)).subscribe((value) => { + results.push(value); + }); + + expect(results).toEqual([1, 2, 3, 4]); + }); + + it("should subscribe to source", async () => { + const subscribeSpy = vi.fn(); + + const source = new AsyncObservable(async function* () { + subscribeSpy(); + yield 1; + yield 2; + }); + + await source.pipe(skip(-1)).subscribe(() => {}); + expect(subscribeSpy).toHaveBeenCalledTimes(1); + }); + }); + + describe("when source is empty", () => { + it("should complete without emitting any values", async () => { + const source = AsyncObservable.from([]); + const nextSpy = vi.fn(); + const completionSpy = vi.fn(); + + const sub = source.pipe(skip(2)).subscribe(nextSpy); + sub.finally(completionSpy); + + await sub; + expect(nextSpy).not.toHaveBeenCalled(); + expect(completionSpy).toHaveBeenCalledTimes(1); + }); + + it("should not throw any errors", async () => { + const source = AsyncObservable.from([]); + + let error: Error | null = null; + try { + await source.pipe(skip(5)).subscribe(() => {}); + } catch (e) { + error = e as Error; + } + + expect(error).toBeNull(); + }); + }); + + describe("when source errors", () => { + it("should propagate the error to the subscriber", async () => { + const error = new Error("source error"); + const source = new AsyncObservable(async function* () { + yield 1; + yield 2; + throw error; + }); + + let caughtError: Error | null = null; + try { + await source.pipe(skip(1)).subscribe(() => {}); + } catch (e) { + caughtError = e as Error; + } + + expect(caughtError).toBe(error); + }); + + it("should not emit any value after error", async () => { + const error = new Error("source error"); + const emittedValues: number[] = []; + + const source = new AsyncObservable(async function* () { + yield 1; + yield 2; + yield 3; + throw error; + yield 4; // Should not be emitted + yield 5; // Should not be emitted + }); + + try { + await source.pipe(skip(1)).subscribe((value) => { + emittedValues.push(value); + }); + } catch (e) { + // Expected error + } + + expect(emittedValues).toEqual([2, 3]); + }); + }); + + describe("performance characteristics", () => { + it("should not buffer skipped values", async () => { + const memorySpy = vi.fn(); + + const source = new AsyncObservable(async function* () { + for (let i = 0; i < 100; i++) { + yield i; + memorySpy(i); + } + }); + + await source.pipe(skip(50)).subscribe(() => {}); + + // Verify all values were yielded but not buffered + expect(memorySpy).toHaveBeenCalledTimes(100); + }); + + it("should maintain memory efficiency", async () => { + const source = new AsyncObservable(async function* () { + for (let i = 0; i < 1000; i++) { + yield i; + } + }); + + let emitCount = 0; + await source.pipe(skip(500)).subscribe(() => { + emitCount++; + }); + + // Should only emit the remaining 500 values + expect(emitCount).toBe(500); + }); + }); +}); diff --git a/packages/eventkit/lib/operators/every.ts b/packages/eventkit/lib/operators/every.ts new file mode 100644 index 0000000..88cb446 --- /dev/null +++ b/packages/eventkit/lib/operators/every.ts @@ -0,0 +1,37 @@ +import { singletonFrom } from "../singleton"; +import { type SingletonOperatorFunction, type Falsy } from "../utils/types"; + +/** + * Determines whether all items emitted by the source observable satisfy a specified condition. + * Emits `true` if all values pass the condition, or `false` immediately if any value fails. + * + * Note: If any value fails the predicate, the observable will be cancelled. + * + * @param predicate A function that evaluates each value emitted by the source observable. + * Returns `true` if the value satisfies the condition, `false` otherwise. + * + * @group Operators + */ +export function every( + predicate: (value: T, index: number) => boolean +): SingletonOperatorFunction; +export function every( + predicate: BooleanConstructor +): SingletonOperatorFunction extends never ? false : boolean>; +export function every( + predicate: (value: T, index: number) => boolean +): SingletonOperatorFunction { + return (source) => + singletonFrom( + new source.AsyncObservable(async function* () { + let index = 0; + for await (const value of source) { + if (!predicate(value, index++)) { + yield false; + return; + } + } + yield true; + }) + ); +} diff --git a/packages/eventkit/lib/operators/find.ts b/packages/eventkit/lib/operators/find.ts new file mode 100644 index 0000000..b3be4d0 --- /dev/null +++ b/packages/eventkit/lib/operators/find.ts @@ -0,0 +1,38 @@ +import { singletonFrom } from "../singleton"; +import { type TruthyTypesOf, type SingletonOperatorFunction } from "../utils/types"; + +/** + * Emits the first value emitted by the source observable that satisfies a specified condition. + * If no such value is found, emits `undefined` when the source observable completes. + * + * @param predicate A function that evaluates each value emitted by the source observable. + * Returns `true` if the value satisfies the condition, `false` otherwise. + * + * @group Operators + */ +export function find( + predicate: (value: T, index: number) => value is S +): SingletonOperatorFunction; +export function find( + predicate: (value: T, index: number) => boolean +): SingletonOperatorFunction; +export function find( + predicate: BooleanConstructor +): SingletonOperatorFunction>; +export function find( + predicate: ((value: T, index: number) => boolean) | BooleanConstructor +): SingletonOperatorFunction { + return (source) => + singletonFrom( + new source.AsyncObservable(async function* () { + let index = 0; + for await (const value of source) { + if (typeof predicate === "function" ? predicate(value, index++) : Boolean(value)) { + yield value; + return; + } + } + yield undefined; + }) + ); +} diff --git a/packages/eventkit/lib/operators/findIndex.ts b/packages/eventkit/lib/operators/findIndex.ts new file mode 100644 index 0000000..cf8140a --- /dev/null +++ b/packages/eventkit/lib/operators/findIndex.ts @@ -0,0 +1,36 @@ +import { singletonFrom } from "../singleton"; +import { type Falsy, type SingletonOperatorFunction } from "../utils/types"; + +/** + * Emits the index of the first value emitted by the source observable that satisfies a specified + * condition. If no such value is found, emits `-1` when the source observable completes. + * + * @param predicate A function that evaluates each value emitted by the source observable. + * Returns `true` if the value satisfies the condition, `false` otherwise. + * + * @group Operators + */ +export function findIndex( + predicate: BooleanConstructor +): SingletonOperatorFunction; +export function findIndex( + predicate: (value: T, index: number) => boolean +): SingletonOperatorFunction; +export function findIndex( + predicate: ((value: T, index: number) => boolean) | BooleanConstructor +): SingletonOperatorFunction { + return (source) => + singletonFrom( + new source.AsyncObservable(async function* () { + let index = 0; + for await (const value of source) { + if (typeof predicate === "function" ? predicate(value, index) : Boolean(value)) { + yield index; + return; + } + index++; + } + yield -1; + }) + ); +} diff --git a/packages/eventkit/lib/operators/first.ts b/packages/eventkit/lib/operators/first.ts new file mode 100644 index 0000000..c8de2e6 --- /dev/null +++ b/packages/eventkit/lib/operators/first.ts @@ -0,0 +1,66 @@ +import { singletonFrom } from "../singleton"; +import { NoValuesError } from "../utils/errors"; +import { iife } from "../utils/operators"; +import { type TruthyTypesOf, type SingletonOperatorFunction } from "../utils/types"; + +/** + * Emits the first value emitted by the source observable that satisfies a specified condition. If + * no such value is found when the source observable completes, the `defaultValue` is emitted if + * it's provided. If it isn't, a NoValuesError is thrown. + * + * @throws {NoValuesError} Will throw a `NoValuesError` if no value is found and no default value is provided. + * + * @param predicate A function that evaluates each value emitted by the source observable. + * Returns `true` if the value satisfies the condition, `false` otherwise. + * @param defaultValue The default value returned when no value matches the predicate. + * + * @group Operators + */ +export function first( + predicate?: null, + defaultValue?: D +): SingletonOperatorFunction; +export function first( + predicate: BooleanConstructor +): SingletonOperatorFunction>; +export function first( + predicate: BooleanConstructor, + defaultValue: D +): SingletonOperatorFunction | D>; +export function first( + predicate: (value: T, index: number) => value is S, + defaultValue?: S +): SingletonOperatorFunction; +export function first( + predicate: (value: T, index: number) => value is S, + defaultValue: D +): SingletonOperatorFunction; +export function first( + predicate: (value: T, index: number) => boolean, + defaultValue?: D +): SingletonOperatorFunction; +export function first( + predicate?: ((value: T, index: number) => boolean) | BooleanConstructor | null, + defaultValue?: T +): SingletonOperatorFunction { + const hasDefaultValue = arguments.length >= 2; + return (source) => + singletonFrom( + new source.AsyncObservable(async function* () { + let index = 0; + for await (const value of source) { + const passed = iife(() => { + if (predicate === null || predicate === undefined) return true; + else if (typeof predicate === "function") return predicate(value, index++); + else return Boolean(value); + }); + if (passed) { + yield value; + return; + } + } + if (hasDefaultValue) yield defaultValue; + else throw new NoValuesError(); + }) + ); +} diff --git a/packages/eventkit/lib/operators/index.ts b/packages/eventkit/lib/operators/index.ts index 24d0b15..c25523c 100644 --- a/packages/eventkit/lib/operators/index.ts +++ b/packages/eventkit/lib/operators/index.ts @@ -3,12 +3,22 @@ export * from "./concat"; export * from "./count"; export * from "./dlq"; export * from "./elementAt"; +export * from "./every"; export * from "./filter"; +export * from "./find"; +export * from "./findIndex"; +export * from "./first"; +export * from "./isEmpty"; +export * from "./last"; export * from "./map"; +export * from "./max"; export * from "./merge"; +export * from "./min"; +export * from "./pairwise"; export * from "./partition"; export * from "./pipe"; export * from "./reduce"; export * from "./retry"; +export * from "./skip"; export * from "./takeUntil"; export * from "./withScheduler"; diff --git a/packages/eventkit/lib/operators/isEmpty.ts b/packages/eventkit/lib/operators/isEmpty.ts new file mode 100644 index 0000000..0f4df9a --- /dev/null +++ b/packages/eventkit/lib/operators/isEmpty.ts @@ -0,0 +1,22 @@ +import { singletonFrom } from "../singleton"; +import { type SingletonOperatorFunction } from "../utils/types"; + +/** + * Emits `false` if the source observable emits any values, or emits `true` if the source observable + * completes without emitting any values. + * + * @group Operators + */ +export function isEmpty(): SingletonOperatorFunction { + return (source) => + singletonFrom( + new source.AsyncObservable(async function* () { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const _ of source) { + yield false; + return; + } + yield true; + }) + ); +} diff --git a/packages/eventkit/lib/operators/last.ts b/packages/eventkit/lib/operators/last.ts new file mode 100644 index 0000000..704967b --- /dev/null +++ b/packages/eventkit/lib/operators/last.ts @@ -0,0 +1,69 @@ +import { singletonFrom } from "../singleton"; +import { NoValuesError } from "../utils/errors"; +import { iife } from "../utils/operators"; +import { type SingletonOperatorFunction, type TruthyTypesOf } from "../utils/types"; + +/** + * Emits the last value emitted by the source observable that satisfies a specified condition. If + * no such value is found when the source observable completes, the `defaultValue` is emitted if + * it's provided. If it isn't, a NoValuesError is thrown. + * + * @throws {NoValuesError} Will throw a `NoValuesError` if no value is found and no default value is provided. + * + * @param predicate A function that evaluates each value emitted by the source observable. + * Returns `true` if the value satisfies the condition, `false` otherwise. + * @param defaultValue The default value returned when no value matches the predicate. + * + * @group Operators + */ +export function last( + predicate?: null, + defaultValue?: D +): SingletonOperatorFunction; +export function last( + predicate: BooleanConstructor +): SingletonOperatorFunction>; +export function last( + predicate: BooleanConstructor, + defaultValue: D +): SingletonOperatorFunction | D>; +export function last( + predicate: (value: T, index: number) => value is S, + defaultValue?: S +): SingletonOperatorFunction; +export function last( + predicate: (value: T, index: number) => value is S, + defaultValue: D +): SingletonOperatorFunction; +export function last( + predicate: (value: T, index: number) => boolean, + defaultValue?: D +): SingletonOperatorFunction; +export function last( + predicate?: ((value: T, index: number) => boolean) | BooleanConstructor | null, + defaultValue?: T +): SingletonOperatorFunction { + const hasDefaultValue = arguments.length >= 2; + return (source) => + singletonFrom( + new source.AsyncObservable(async function* () { + let index = 0; + let lastValue: T | undefined; + for await (const value of source) { + const passed = iife(() => { + if (predicate === null || predicate === undefined) return true; + else if (typeof predicate === "function") return predicate(value, index++); + else return Boolean(value); + }); + if (passed) { + lastValue = value; + } + } + if (lastValue === undefined) { + if (hasDefaultValue) yield defaultValue; + else throw new NoValuesError(); + } + yield lastValue; + }) + ); +} diff --git a/packages/eventkit/lib/operators/max.ts b/packages/eventkit/lib/operators/max.ts new file mode 100644 index 0000000..3753b8e --- /dev/null +++ b/packages/eventkit/lib/operators/max.ts @@ -0,0 +1,17 @@ +import { type SingletonOperatorFunction } from "../utils/types"; +import { reduce } from "./reduce"; + +/** + * Emits the maximum value emitted by the source observable. The source observable must emit a + * comparable type (numbers, strings, dates, etc.), or any type when a comparer function is + * provided. + * + * @param comparer A function that compares two values and returns a number; a positive number if the + * first value is greater than the second, a negative number if the first value is less than the + * second, or 0 if they are equal. + * @group Operators + */ +export function max(comparer?: (x: T, y: T) => number): SingletonOperatorFunction { + comparer ??= (x: T, y: T) => (x > y ? 1 : -1); + return reduce((x, y) => (comparer(x, y) > 0 ? x : y)); +} diff --git a/packages/eventkit/lib/operators/min.ts b/packages/eventkit/lib/operators/min.ts new file mode 100644 index 0000000..815b824 --- /dev/null +++ b/packages/eventkit/lib/operators/min.ts @@ -0,0 +1,17 @@ +import { type SingletonOperatorFunction } from "../utils/types"; +import { reduce } from "./reduce"; + +/** + * Emits the minimum value emitted by the source observable. The source observable must emit a + * comparable type (numbers, strings, dates, etc.), or any type when a comparer function is + * provided. + * + * @param comparer A function that compares two values and returns a number; a positive number if the + * first value is greater than the second, a negative number if the first value is less than the + * second, or 0 if they are equal. + * @group Operators + */ +export function min(comparer?: (x: T, y: T) => number): SingletonOperatorFunction { + comparer ??= (x: T, y: T) => (x > y ? 1 : -1); + return reduce((x, y) => (comparer(x, y) < 0 ? x : y)); +} diff --git a/packages/eventkit/lib/operators/pairwise.ts b/packages/eventkit/lib/operators/pairwise.ts new file mode 100644 index 0000000..5989ff5 --- /dev/null +++ b/packages/eventkit/lib/operators/pairwise.ts @@ -0,0 +1,26 @@ +import { type OperatorFunction } from "@eventkit/async-observable"; + +/** + * Groups pairs of consecutive emissions together and emits them as a tuple of two values. In other + * words, it will take the current value and the previous value and emit them as a pair. + * + * Note: Because pairwise only emits complete pairs, the first emission from the source observable + * will be "skipped" since there is no previous value. This means that the output observable will + * always have one less emission than the source observable. + * + * @group Operators + */ +export function pairwise(): OperatorFunction { + return (source) => + new source.AsyncObservable(async function* () { + let prev: T | undefined; + for await (const value of source) { + if (prev === undefined) { + prev = value; + continue; + } + yield [prev, value]; + prev = value; + } + }); +} diff --git a/packages/eventkit/lib/operators/reduce.ts b/packages/eventkit/lib/operators/reduce.ts index 44c047d..affd373 100644 --- a/packages/eventkit/lib/operators/reduce.ts +++ b/packages/eventkit/lib/operators/reduce.ts @@ -1,6 +1,10 @@ import { singletonFrom } from "../singleton"; +import { NoValuesError } from "../utils/errors"; import { type SingletonOperatorFunction } from "../utils/types"; +const kUndefinedReducerValue = Symbol("undefinedReducerValue"); +type UndefinedReducerValue = typeof kUndefinedReducerValue; + /** * Applies an accumulator function over the source generator, and returns the * accumulated result when the source completes, given an optional seed value. @@ -9,35 +13,61 @@ import { type SingletonOperatorFunction } from "../utils/types"; * [Array.prototype.reduce()](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/reduce), * `reduce` applies an `accumulator` function against an accumulation and each * value emitted by the source generator to reduce it to a single value, emitted - * on the output generator. Note that `reduce` will only emit one value, only - * when the source generator completes. + * on the output generator. This operator also behaves similarly to the reduce method in terms of + * how it handles the seed value (or initialValue): + * + * * If no seed value is provided and the observable emits more than one value, the accumulator + * function will be called for each value starting with the second value, where the first value is + * used as the initial accumulator value. This means that the accumulator function will be called + * N-1 times where N is the number of values emitted by the source generator. + * + * * If the seed value is provided and the observable emits any values, then the accumulator + * function will always be called starting with the first emission from the source generator. + * + * * If the observable only emits one value and no seed value is provided, or if the seed value is + * provided but the observable doesn't emit any values, the solo value will be emitted without + * any calls to the accumulator function. + * + * * If the observable emits no values and a seed value is provided, the seed value will be emitted + * without any calls to the accumulator function. + * + * * If the observable emits no values and no seed value is provided, the operator will throw a + * `NoValuesError` on completion. + * + * @throws {NoValuesError} If the observable emits no values and no seed value is provided. * * @param accumulator The accumulator function called on each source value. * @param seed The initial accumulation value. + * * @group Operators */ +export function reduce( + accumulator: (acc: A | V, value: V, index: number) => A +): SingletonOperatorFunction; export function reduce( accumulator: (acc: A, value: V, index: number) => A, seed: A ): SingletonOperatorFunction; -export function reduce( - accumulator: (acc: A | undefined, value: V, index: number) => A, - seed?: A +export function reduce( + accumulator: (acc: A | S, value: V, index: number) => A, + seed: S ): SingletonOperatorFunction; export function reduce( - accumulator: (acc: A | undefined, value: V, index: number) => A, + accumulator: (acc: A | V, value: V, index: number) => A, seed?: A -): SingletonOperatorFunction { - const hasSeed = arguments.length >= 2; +): SingletonOperatorFunction { return (source) => singletonFrom( - new source.AsyncObservable(async function* (this: any) { - let acc = hasSeed ? seed : undefined; + new source.AsyncObservable(async function* () { + let acc: A | V | UndefinedReducerValue = seed ?? kUndefinedReducerValue; let index = 0; for await (const value of source) { - acc = accumulator(acc as A | undefined, value, index++); + if (acc === kUndefinedReducerValue) acc = value; + else acc = accumulator(acc, value, index); + index++; } - yield acc as A; + if (acc === kUndefinedReducerValue) throw new NoValuesError(); + yield acc; }) ); } diff --git a/packages/eventkit/lib/operators/skip.ts b/packages/eventkit/lib/operators/skip.ts new file mode 100644 index 0000000..dad92e2 --- /dev/null +++ b/packages/eventkit/lib/operators/skip.ts @@ -0,0 +1,14 @@ +import { type OperatorFunction } from "@eventkit/async-observable"; + +import { filter } from "./filter"; + +/** + * Returns an observable that skips the first `count` values emitted by the source observable. + * + * @param count The number of values to skip. + * + * @group Operators + */ +export function skip(count: number): OperatorFunction { + return filter((_, index) => count <= index); +} diff --git a/packages/eventkit/lib/utils/errors.ts b/packages/eventkit/lib/utils/errors.ts index e6a59d3..d87ed74 100644 --- a/packages/eventkit/lib/utils/errors.ts +++ b/packages/eventkit/lib/utils/errors.ts @@ -29,7 +29,7 @@ export class InvalidConcurrencyLimitError extends Error { } /** - * An error thrown when an observable completes without emitting any values. + * An error thrown when an observable completes without emitting any valid values. * * @group Errors */ diff --git a/packages/eventkit/lib/utils/operators.ts b/packages/eventkit/lib/utils/operators.ts index 7ea9927..ea91df3 100644 --- a/packages/eventkit/lib/utils/operators.ts +++ b/packages/eventkit/lib/utils/operators.ts @@ -15,3 +15,13 @@ export function not( ): (value: T, index: number) => boolean { return (value: T, index: number) => !pred(value, index); } + +/** + * Immediately invokes a function and returns its result. + * + * @param fn - The function to invoke + * @returns The result of the function + */ +export function iife(fn: () => T): T { + return fn(); +}