Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions .changeset/fine-parents-serve.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
---
"@eventkit/base": minor
---

Introduces `SingletonAsyncObservable`; a utility class for observables that lets you access the value emitted by observables that emit one (and only one) value (like the observables returned from `reduce()`, `count()`, etc.) using native await syntax.

This makes the consumption of these single value operators a little bit more readable. For instance:

```ts
const obs = AsyncObservable.from([1, 2, 3]);
const singleton = obs.pipe(first());

// instead of this:
let firstValue: number | undefined;
await obs.subscribe((value) => {
firstValue = value;
});
console.log(firstValue); // 1

// you can just do this:
console.log(await singleton); // 1
```
29 changes: 29 additions & 0 deletions docs/guide/concepts/transforming-data.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,35 @@ await processed$.subscribe(console.log);
// -> 4 (after reduce, 4 + 0 = 4)
```

## Singleton Operators

The output of some operators is an observable that only emits a single value. For those special cases, the observable that is returned is what is known as a "singleton" observable. (See [SingletonAsyncObservable](/reference/_eventkit/base/SingletonAsyncObservable))

All a singleton observable does is extend [AsyncObservable](/reference/_eventkit/base/AsyncObservable) class and implement the `PromiseLike` interface, which means that when it's used in an `await` statement (or with the `then` method) it will return a promise that will subscribe to the observable and resolve with the first (and only) value emitted.

This means that when dealing with a singleton observable, all you need to do is `await` the observable to get the emitted value instead of calling `subscribe` and hoisting the value out of the callback.

For example, the [`first`](/reference/_eventkit/base/first) operator is a singleton operator that emits the first value of an observable and then completes.

```ts
import { AsyncObservable, first } from "@eventkit/base";

const obs = AsyncObservable.from([1, 2, 3]);
const singleton = obs.pipe(first());

// instead of this:
let firstValue: number | undefined;
await obs.subscribe((value) => {
firstValue = value;
});
console.log(firstValue); // 1

// you can just do this:
console.log(await singleton); // 1
```

Singleton observables are meant to provide a shorthand for dealing with observables that only emit a single value. They are still observables in every other way, so you can still use methods like `pipe`, `subscribe`, `drain`, etc.

## Available Operators

Eventkit provides a variety of built-in operators to handle common transformations. A complete reference of all operators can be found [here](/reference/operators).
Expand Down
30 changes: 30 additions & 0 deletions packages/eventkit/__tests__/operators/count.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ describe("count", () => {
await sub;
expect(completionSpy).toHaveBeenCalledTimes(1);
});

it("should emit final count using singleton object", async () => {
const source = AsyncObservable.from([1, 2, 3]);
expect(await source.pipe(count())).toEqual(3);
});
});

describe("when source emits no values", () => {
Expand All @@ -40,6 +45,11 @@ describe("count", () => {

expect(result).toEqual([0]);
});

it("should emit 0 using singleton object", async () => {
const source = AsyncObservable.from([]);
expect(await source.pipe(count())).toEqual(0);
});
});

describe("when source emits multiple values", () => {
Expand Down Expand Up @@ -72,6 +82,11 @@ describe("count", () => {
expect(indexSpy).toHaveBeenNthCalledWith(2, 1);
expect(indexSpy).toHaveBeenNthCalledWith(3, 2);
});

it("should emit final count using singleton object", async () => {
const source = AsyncObservable.from(["a", "b", "c"]);
expect(await source.pipe(count())).toEqual(3);
});
});

describe("when predicate is provided", () => {
Expand Down Expand Up @@ -116,6 +131,11 @@ describe("count", () => {

expect(result).toEqual([0]); // No values counted
});

it("should pass final count for values that satisfy predicate using singleton object", async () => {
const source = AsyncObservable.from([1, 2, 3, 4, 5]);
expect(await source.pipe(count((value) => value % 2 === 0))).toEqual(2);
});
});

describe("when source errors", () => {
Expand All @@ -136,5 +156,15 @@ describe("count", () => {

expect(capturedError).toBe(error);
});

it("should propagate error when using singleton object", async () => {
const error = new Error("test error");
const source = new AsyncObservable(async function* () {
yield 1;
await delay(5);
throw error;
});
await expect(source.pipe(count())).rejects.toThrow(error);
});
});
});
31 changes: 31 additions & 0 deletions packages/eventkit/__tests__/operators/elementAt.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ describe("elementAt", () => {
expect(nextSpy).toHaveBeenCalledTimes(1);
expect(nextSpy).toHaveBeenCalledWith(30);
});

it("should emit value at specified index using singleton object", async () => {
const source = AsyncObservable.from(["a", "b", "c", "d"]);
expect(await source.pipe(elementAt(2))).toEqual("c");
});
});

describe("when index is out of range", () => {
Expand Down Expand Up @@ -65,6 +70,11 @@ describe("elementAt", () => {

expect(capturedError).toBeInstanceOf(ArgumentOutOfRangeError);
});

it("should emit default value if no default value is provided using singleton object", async () => {
const source = AsyncObservable.from([1, 2, 3]);
await expect(source.pipe(elementAt(10))).rejects.toThrow(ArgumentOutOfRangeError);
});
});

describe("when index is negative", () => {
Expand Down Expand Up @@ -102,6 +112,11 @@ describe("elementAt", () => {

expect(capturedError).toBeInstanceOf(ArgumentOutOfRangeError);
});

it("should emit default value if no default value is provided using singleton object", async () => {
const source = AsyncObservable.from([1, 2, 3]);
await expect(source.pipe(elementAt(5))).rejects.toThrow(ArgumentOutOfRangeError);
});
});

describe("when source errors", () => {
Expand All @@ -122,6 +137,17 @@ describe("elementAt", () => {

expect(capturedError).toBe(error);
});

it("should propagate error when using singleton object", async () => {
const error = new Error("source error");
const source = new AsyncObservable<number>(async function* () {
yield 1;
yield 2;
await delay(5);
throw error;
});
await expect(source.pipe(elementAt(3))).rejects.toThrow(error);
});
});

describe("when source emits multiple values", () => {
Expand Down Expand Up @@ -150,5 +176,10 @@ describe("elementAt", () => {
expect(nextSpy).toHaveBeenCalledTimes(1);
expect(nextSpy).toHaveBeenCalledWith("c");
});

it("should emit value at specified index using singleton object", async () => {
const source = AsyncObservable.from(["a", "b", "c", "d", "e"]);
expect(await source.pipe(elementAt(2))).toEqual("c");
});
});
});
58 changes: 58 additions & 0 deletions packages/eventkit/__tests__/operators/reduce.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ describe("reduce", () => {
await sub;
expect(completionSpy).toHaveBeenCalledTimes(1);
});

it("should emit final accumulated value using singleton object", async () => {
const source = AsyncObservable.from([1, 2, 3, 4]);
expect(await source.pipe(reduce((acc, value) => acc + value, 0))).toEqual(10);
});
});

describe("when seed value is provided", () => {
Expand All @@ -52,6 +57,11 @@ describe("reduce", () => {
expect(accumulatorSpy).toHaveBeenCalledTimes(3);
expect(accumulatorSpy).toHaveBeenNthCalledWith(1, 5, 1, 0);
});

it("should emit final accumulated value using singleton object", async () => {
const source = AsyncObservable.from([1, 2, 3]);
expect(await source.pipe(reduce((acc, value) => acc + value, 5))).toEqual(11);
});
});

describe("when no seed value is provided", () => {
Expand Down Expand Up @@ -81,6 +91,13 @@ describe("reduce", () => {

expect(result).toEqual(["abc"]);
});

it("should emit final accumulated value using singleton object", async () => {
const source = AsyncObservable.from(["a", "b", "c"]);
expect(
await source.pipe(reduce<string, string>((acc, value) => (acc || "") + value))
).toEqual("abc");
});
});

describe("when source emits multiple values", () => {
Expand Down Expand Up @@ -121,6 +138,13 @@ describe("reduce", () => {
expect(indexSpy).toHaveBeenNthCalledWith(2, 1);
expect(indexSpy).toHaveBeenNthCalledWith(3, 2);
});

it("should emit final accumulated value using singleton object", async () => {
const source = AsyncObservable.from(["a", "b", "c"]);
expect(
await source.pipe(reduce<string, string>((acc, value) => (acc || "") + value))
).toEqual("abc");
});
});

describe("when source emits no values", () => {
Expand All @@ -143,6 +167,11 @@ describe("reduce", () => {
await source.pipe(reduce(accumulatorSpy)).subscribe(() => {});
expect(accumulatorSpy).not.toHaveBeenCalled();
});

it("should emit seed value using singleton object", async () => {
const source = AsyncObservable.from([]);
expect(await source.pipe(reduce((acc, value) => acc + value, 42))).toEqual(42);
});
});

describe("when accumulator throws error", () => {
Expand Down Expand Up @@ -187,6 +216,19 @@ describe("reduce", () => {

expect(nextSpy).not.toHaveBeenCalled();
});

it("should propagate error when using singleton object", async () => {
const source = AsyncObservable.from([1, 2, 3]);
const error = new Error("accumulator error");
await expect(
source.pipe(
reduce((acc, value) => {
if (value === 2) throw error;
return acc + value;
}, 0)
)
).rejects.toThrow(error);
});
});

describe("when source errors", () => {
Expand Down Expand Up @@ -227,6 +269,17 @@ describe("reduce", () => {

expect(nextSpy).not.toHaveBeenCalled();
});

it("should propagate error when using singleton object", async () => {
const error = new Error("source error");
const source = new AsyncObservable<number>(async function* () {
yield 1;
yield 2;
await delay(5);
throw error;
});
await expect(source.pipe(reduce((acc, value) => acc + value, 0))).rejects.toThrow(error);
});
});

describe("when accumulator returns undefined", () => {
Expand All @@ -252,5 +305,10 @@ describe("reduce", () => {
expect(accumulatorSpy).toHaveBeenNthCalledWith(2, undefined, 2, 1);
expect(accumulatorSpy).toHaveBeenNthCalledWith(3, undefined, 3, 2);
});

it("should handle undefined as valid result using singleton object", async () => {
const source = AsyncObservable.from([1, 2, 3]);
expect(await source.pipe(reduce(() => undefined))).toEqual(undefined);
});
});
});
Loading