diff --git a/index.d.ts b/index.d.ts index 3635c1b01..b53e54a94 100644 --- a/index.d.ts +++ b/index.d.ts @@ -171,6 +171,7 @@ type JSONLikeValue = type CanPromise = T | Promise; type CanPromiseLike = T | PromiseLike; type CanArray = T | T[]; +type CanIterable = T | Iterable; type CanUndef = T | undefined; type Nullable = T | null | undefined; diff --git a/src/core/event-emitter/README.md b/src/core/event-emitter/README.md new file mode 100644 index 000000000..e69de29bb diff --git a/src/core/event-emitter/const.ts b/src/core/event-emitter/const.ts new file mode 100644 index 000000000..c604f64c5 --- /dev/null +++ b/src/core/event-emitter/const.ts @@ -0,0 +1,8 @@ +import type { EmitterOptions } from 'core/event-emitter/interface'; + +import defaultEngine from 'core/event-emitter/engines/default'; + +export const defaultOptions: Required = { + engine: defaultEngine, + engineOptions: {} +}; diff --git a/src/core/event-emitter/engines/default.ts b/src/core/event-emitter/engines/default.ts new file mode 100644 index 000000000..346592fca --- /dev/null +++ b/src/core/event-emitter/engines/default.ts @@ -0,0 +1,48 @@ +import { EventEmitter2 } from 'eventemitter2'; + +import type { ConstructorOptions } from 'eventemitter2'; + +import type { EmitterEngine, EmitterEvent } from 'core/event-emitter/interface'; + +/** + * + */ +class Engine extends EventEmitter2 implements EmitterEngine { + /** + * + */ + offAll(event?: CanArray): void { + this.removeAllListeners(event); + } + + /** + * + */ + getEvents(): EmitterEvent[] { + return this.eventNames().flatMap((event) => { + if (Array.isArray(event)) { + return event.map(String); + } + + return String(event); + }); + } + + prepend(event: EmitterEvent, handler: AnyFunction): void { + this.prependListener(event, handler); + } + + /** + * + */ + override hasListeners(event: string): boolean { + return super.hasListeners(event); + } +} + +/** + * + */ +export default function factory(options?: ConstructorOptions): Engine { + return new Engine(options); +} diff --git a/src/core/event-emitter/helpers.ts b/src/core/event-emitter/helpers.ts new file mode 100644 index 000000000..537a318fd --- /dev/null +++ b/src/core/event-emitter/helpers.ts @@ -0,0 +1,8 @@ +import type { EmitterEvent } from 'core/event-emitter/interface'; + +/** + * + */ +export function isEmitterEvent(event: unknown): event is EmitterEvent { + return Object.isString(event); +} diff --git a/src/core/event-emitter/index.ts b/src/core/event-emitter/index.ts new file mode 100644 index 000000000..296fb5cd0 --- /dev/null +++ b/src/core/event-emitter/index.ts @@ -0,0 +1,207 @@ +import { EventEmitter2 } from 'eventemitter2'; + +import { isEmitterEvent } from 'core/event-emitter/helpers'; + +import { defaultOptions } from 'core/event-emitter/const'; + +import Stream from 'core/event-emitter/modules/stream'; + +import type { + + EmitterEngine, + EmitterEngineFactory, + + EmitterOptions, + EmitterEvent, + + EventHandler, + HandlerValues + +} from 'core/event-emitter/interface'; + +export * from 'core/event-emitter/interface'; + +/** + * + */ +export default class EventEmitter { + /** + * + */ + protected readonly options: Required; + + /** + * + */ + protected readonly engine: EmitterEngine; + + /** + * + */ + protected readonly localEmitter: EventEmitter2 = new EventEmitter2(); + + constructor(options: EmitterOptions = {}) { + this.options = Object.mixin(true, {}, defaultOptions, options); + this.engine = this.options.engine(this.options.engineOptions); + } + + /** + * + */ + on(events: CanIterable, handler: EventHandler): void; + + /** + * + */ + on(events: CanIterable): AsyncIterableIterator; + + on(events: CanIterable, handler?: EventHandler): CanVoid> { + const + eventsArr = this.normalizeEvents(events); + + if (handler == null) { + return this.stream(eventsArr); + } + + eventsArr.forEach((event) => this.engine.on(event, handler)); + } + + /** + * + */ + prepend(events: CanIterable, handler: EventHandler): void { + this.normalizeEvents(events).forEach((event) => { + this.engine.prepend(event, handler); + }); + } + + /** + * + */ + once(events: CanIterable, handler: EventHandler): void; + + /** + * + */ + once(events: CanIterable): AsyncIterableIterator; + + /** + * + */ + once(events: CanIterable, handler?: EventHandler): CanVoid> { + const + eventsArr = this.normalizeEvents(events); + + eventsArr.forEach((event) => { + const wrapper: EventHandler = (...params) => { + if (handler == null) { + this.localEmitter.emit(`off.${event}`, {forbid: true}); + } else { + handler(...params); + } + + this.off(event, wrapper); + }; + + this.on(event, wrapper); + }); + + if (handler == null) { + return this.stream(eventsArr); + } + } + + /** + * + */ + any(events: Iterable, handler: EventHandler): void; + + /** + * + */ + any(events: Iterable): AsyncIterableIterator; + + /** + * + */ + any(events: Iterable, handler?: EventHandler): CanVoid> { + const + eventsArr = this.normalizeEvents(events), + handlers = new Map(); + + eventsArr.forEach((event) => { + const wrapper: EventHandler = (...params) => { + handler?.(...params); + + handlers.forEach((localHandler, event) => { + this.off(event, localHandler); + + if (handler == null) { + this.localEmitter.emit(`off.${event}`); + } + }); + }; + + this.on(event, wrapper); + handlers.set(event, wrapper); + }); + + if (handler == null) { + return this.stream(eventsArr); + } + } + + /** + * + */ + off(events?: CanIterable, handler?: EventHandler): void { + const emitOff = (e: string) => { + this.localEmitter.emit(`off.${e}`); + }; + + if (events == null) { + this.engine.getEvents().forEach(emitOff); + + this.engine.offAll(); + + return; + } + + this.normalizeEvents(events).forEach((event) => { + if (handler == null) { + emitOff(event); + + this.engine.offAll(event); + } else { + this.engine.off(event, handler); + + if (!this.engine.hasListeners(event)) { + emitOff(event); + } + } + }); + } + + /** + * + */ + emit(events: CanIterable, ...values: HandlerValues): void { + this.normalizeEvents(events).forEach((event) => { + this.engine.emit(event, ...values); + }); + } + + /** + * + */ + protected normalizeEvents(event: CanIterable): EmitterEvent[] { + return isEmitterEvent(event) ? [event] : [...new Set(event)]; + } + + /** + * + */ + protected stream(eventsArr: EmitterEvent[]): Stream { + return new Stream(this, this.localEmitter, eventsArr); + } +} diff --git a/src/core/event-emitter/interface.ts b/src/core/event-emitter/interface.ts new file mode 100644 index 000000000..ce395ec41 --- /dev/null +++ b/src/core/event-emitter/interface.ts @@ -0,0 +1,81 @@ +import type defaultEngine from 'core/event-emitter/engines/default'; + +/** + * + */ +export interface EmitterOptions { + /** + * + */ + engine?: T; + + /** + * + */ + engineOptions?: InferFactoryParameters; +} + +/** + * + */ +export type EmitterEngineFactory = (params?: Dictionary) => EmitterEngine; + +/** + * + */ +export interface EmitterEngine { + /** + * + */ + on(event: EmitterEvent, handler: AnyFunction): void; + + /** + * + */ + prepend(event: EmitterEvent, handler: AnyFunction): void; + + /** + * + */ + off(event: EmitterEvent, handler: AnyFunction): void; + + /** + * + */ + emit(event: EmitterEvent, ...values: unknown[]): void; + + /** + * + */ + offAll(event?: EmitterEvent): void; + + /** + * + */ + getEvents(): EmitterEvent[]; + + /** + * + */ + hasListeners(event: EmitterEvent): boolean; +} + +/** + * + */ +export type EmitterEvent = string; + +/** + * + */ +export type EventHandler = (...values: HandlerValues) => void; + +/** + * + */ +export type HandlerValues = unknown[]; + +/** + * + */ +type InferFactoryParameters = Parameters[0]; diff --git a/src/core/event-emitter/modules/stream/index.ts b/src/core/event-emitter/modules/stream/index.ts new file mode 100644 index 000000000..3ea6799e4 --- /dev/null +++ b/src/core/event-emitter/modules/stream/index.ts @@ -0,0 +1,207 @@ +import type { EventEmitter2 } from 'eventemitter2'; + +import type { + + QueueChunk, + LocalOptions + +} from 'core/event-emitter/modules/stream/interface'; + +import type EventEmitter from 'core/event-emitter'; + +import type { + + HandlerValues, + + EmitterEvent, + EventHandler + +} from 'core/event-emitter/interface'; + +import { createsAsyncSemaphore } from 'core/event'; + +import { Queue } from 'core/queue'; + +/** + * + */ +export default class Stream implements AsyncIterableIterator { + /** + * + */ + protected readonly emitter: EventEmitter; + + /** + * + */ + protected readonly localEmitter: EventEmitter2; + + /** + * + */ + protected readonly events: EmitterEvent[]; + + /** + * + */ + protected readonly queue: Queue = new Queue(); + + /** + * + */ + protected readonly forbiddenEvents: Set = new Set(); + + /** + * + */ + protected readonly listeners: Map = new Map(); + + /** + * + */ + protected resolvePromise: Nullable<(value: IteratorResult) => void> = null; + + /** + * + */ + protected pendingPromise: Nullable>> = null; + + /** + * + */ + protected isDone: boolean = false; + + /** + * + */ + protected returnAfterQueueIsEmpty: boolean = false; + + constructor(emitter: EventEmitter, localEmitter: EventEmitter2, events: EmitterEvent[]) { + this.emitter = emitter; + this.events = events; + this.localEmitter = localEmitter; + + const terminateStream = () => { + if (this.queue.length > 0) { + this.returnAfterQueueIsEmpty = true; + } else { + void this.return(); + } + }; + + const + semaphore = createsAsyncSemaphore(terminateStream, ...this.events); + + this.events.forEach((event) => { + const handler: EventHandler = (...value) => { + if (this.pendingPromise == null) { + this.queue.push({event, value}); + } else { + this.resolvePromise?.({done: false, value}); + } + }; + + this.listeners.set(event, handler); + + this.emitter.prepend(event, handler); + + this.localEmitter.once(`off.${event}`, (options?: LocalOptions) => { + semaphore(event); + + if (options?.forbid) { + this.forbiddenEvents.add(event); + } + }); + }); + } + + /** + * + */ + [Symbol.asyncIterator](): AsyncIterableIterator { + return this; + } + + /** + * + */ + next(): Promise> { + if (this.isDone) { + return this.return(); + } + + if (this.queue.length > 0) { + const + chunk = this.getNextAvailableQueueChunk(); + + if (chunk != null) { + return Promise.resolve({done: false, value: chunk.value}); + } + } + + if (this.returnAfterQueueIsEmpty) { + return this.return(); + } + + return this.pendingPromise ??= createPromise.call(this); + + function createPromise(this: Stream): Promise> { + return new Promise((resolve) => { + this.resolvePromise = (chunk) => { + resolve(chunk); + + this.resolvePromise = null; + this.pendingPromise = null; + }; + }); + } + } + + /** + * + */ + return(): Promise> { + const chunk: IteratorReturnResult = { + done: true, + value: undefined + }; + + if (this.isDone) { + return Promise.resolve(chunk); + } + + this.resolvePromise?.(chunk); + this.offAllListeners(); + + this.queue.clear(); + this.forbiddenEvents.clear(); + this.listeners.clear(); + + this.isDone = true; + + return Promise.resolve(chunk); + } + + /** + * + */ + protected offAllListeners(): void { + this.listeners.forEach((listener, event) => { + this.emitter.off(event, listener); + }); + } + + /** + * + */ + protected getNextAvailableQueueChunk(): Nullable { + let + chunk = this.queue.shift(); + + while (chunk != null && this.forbiddenEvents.has(chunk.event)) { + chunk = this.queue.shift()!; + } + + return chunk; + } +} diff --git a/src/core/event-emitter/modules/stream/interface.ts b/src/core/event-emitter/modules/stream/interface.ts new file mode 100644 index 000000000..b68cf5a59 --- /dev/null +++ b/src/core/event-emitter/modules/stream/interface.ts @@ -0,0 +1,26 @@ +import type { EmitterEvent, HandlerValues } from 'core/event-emitter/interface'; + +/** + * + */ +export interface QueueChunk { + /** + * + */ + value: HandlerValues; + + /** + * + */ + event: EmitterEvent; +} + +/** + * + */ +export interface LocalOptions { + /** + * + */ + forbid?: boolean; +} diff --git a/src/core/event-emitter/spec.ts b/src/core/event-emitter/spec.ts new file mode 100644 index 000000000..0cea80be8 --- /dev/null +++ b/src/core/event-emitter/spec.ts @@ -0,0 +1,178 @@ +import EventEmitter from 'core/event-emitter'; + +describe('core/event-emitter', () => { + describe('subscribes to an event and recieves the emitted data', () => { + describe('until all events are emitted only once', () => { + it('via callback', () => { + const + emitter = new EventEmitter(), + events = ['foo', 'bar']; + + const + listener = jest.fn(), + recievedValues: unknown[] = []; + + emitter.once(events, (...values) => { + recievedValues.push(...values); + listener(); + }); + + emitter.emit('foo', 1); + expect(listener).toBeCalledTimes(1); + expect(recievedValues).toEqual([1]); + + emitter.emit('bar', 2, 3); + expect(listener).toBeCalledTimes(2); + expect(recievedValues).toEqual([1, 2, 3]); + + emitter.emit('foo', 4); + emitter.emit('bar', 5, 6); + + expect(listener).toBeCalledTimes(2); + expect(recievedValues).toEqual([1, 2, 3]); + }); + + it('via stream', async () => { + const + emitter = new EventEmitter(), + events = ['foo', 'bar']; + + const + stream = createStream(), + listener = jest.fn(), + recievedValues: unknown[] = []; + + emitter.emit('foo', 1); + emitter.emit('foo', 2); + + queueMicrotask(() => emitter.emit('bar', 3)); + queueMicrotask(() => emitter.emit('bar', 4)); + + await stream; + + expect(listener).toBeCalledTimes(2); + expect(recievedValues).toEqual([1, 3]); + + async function createStream(): Promise { + for await (const values of emitter.once(events)) { + recievedValues.push(...values); + listener(); + } + } + }); + }); + + describe('until any of the events is emitted only once', () => { + it('via callback', () => { + const + emitter = new EventEmitter(), + events = ['foo', 'bar']; + + const + listener = jest.fn(), + recievedValues: unknown[] = []; + + emitter.any(events, (...values) => { + recievedValues.push(...values); + listener(); + }); + + const [first, second] = Math.random() > 0.5 ? events : [...events].reverse(); + + emitter.emit(first, 1); + emitter.emit(first, 2); + emitter.emit(second, 3); + + expect(listener).toBeCalledTimes(1); + expect(recievedValues).toEqual([1]); + }); + + it('via stream', async () => { + const + emitter = new EventEmitter(), + events = ['foo', 'bar']; + + const + stream = createStream(), + listener = jest.fn(), + recievedValues: unknown[] = []; + + const + [first, second] = Math.random() > 0.5 ? events : [...events].reverse(); + + emitter.emit(first, 1); + emitter.emit(first, 2); + emitter.emit(second, 3); + + await stream; + + expect(recievedValues).toEqual([1]); + expect(listener).toBeCalledTimes(1); + + async function createStream(): Promise { + for await (const values of emitter.any(events)) { + recievedValues.push(...values); + listener(); + } + } + }); + }); + + it('subscribing by a callback until it is unsubscribed explicitly', () => { + const + emitter = new EventEmitter(), + e = 'event'; + + const + listener = jest.fn(), + recievedValues: unknown[] = []; + + emitter.on(e, (...values) => { + recievedValues.push(...values); + listener(); + }); + + emitter.emit(e, 1); + emitter.emit(e, 2, 3); + + expect(listener).toBeCalledTimes(2); + expect(recievedValues).toEqual([1, 2, 3]); + + emitter.off(e); + + emitter.emit(e, 4, 5); + + expect(listener).toBeCalledTimes(2); + expect(recievedValues).toEqual([1, 2, 3]); + }); + + it('subscribing via stream until all listeners to the event are unsubscribed', async () => { + const + emitter = new EventEmitter(), + e = 'event'; + + const + stream = createStream(), + listener = jest.fn(), + recievedValues: unknown[] = []; + + emitter.emit(e, 1); + emitter.emit(e, 2); + + queueMicrotask(() => emitter.emit(e, 3)); + queueMicrotask(() => emitter.off(e)); + + await stream; + + expect(recievedValues).toEqual([1, 2, 3]); + expect(listener).toBeCalledTimes(3); + + async function createStream(): Promise { + for await (const values of emitter.on(e)) { + recievedValues.push(...values); + listener(); + } + } + }); + }); +});