From ec7027f89dadbf99d8829aa9bc8b7f55ec32ac00 Mon Sep 17 00:00:00 2001 From: geoprv Date: Thu, 23 Mar 2023 18:59:08 +0300 Subject: [PATCH 01/10] feat: added core/event-emitter WIP --- src/core/event-emitter/README.md | 0 src/core/event-emitter/const.ts | 8 ++ src/core/event-emitter/engines/default.ts | 37 +++++++ src/core/event-emitter/index.ts | 91 +++++++++++++++++ src/core/event-emitter/interface.ts | 81 +++++++++++++++ src/core/event-emitter/modules/stream.ts | 119 ++++++++++++++++++++++ src/core/event-emitter/spec.js | 3 + 7 files changed, 339 insertions(+) create mode 100644 src/core/event-emitter/README.md create mode 100644 src/core/event-emitter/const.ts create mode 100644 src/core/event-emitter/engines/default.ts create mode 100644 src/core/event-emitter/index.ts create mode 100644 src/core/event-emitter/interface.ts create mode 100644 src/core/event-emitter/modules/stream.ts create mode 100644 src/core/event-emitter/spec.js diff --git a/src/core/event-emitter/README.md b/src/core/event-emitter/README.md new file mode 100644 index 000000000..e69de29bb diff --git a/src/core/event-emitter/const.ts b/src/core/event-emitter/const.ts new file mode 100644 index 000000000..c604f64c5 --- /dev/null +++ b/src/core/event-emitter/const.ts @@ -0,0 +1,8 @@ +import type { EmitterOptions } from 'core/event-emitter/interface'; + +import defaultEngine from 'core/event-emitter/engines/default'; + +export const defaultOptions: Required = { + engine: defaultEngine, + engineOptions: {} +}; diff --git a/src/core/event-emitter/engines/default.ts b/src/core/event-emitter/engines/default.ts new file mode 100644 index 000000000..8977b1518 --- /dev/null +++ b/src/core/event-emitter/engines/default.ts @@ -0,0 +1,37 @@ +import { EventEmitter2 } from 'eventemitter2'; + +import type { ConstructorOptions } from 'eventemitter2'; + +import type { EmitterEngine, EmitterEvent } from 'core/event-emitter/interface'; + +/** + * + */ +class Engine extends EventEmitter2 implements EmitterEngine { + /** + * + */ + offAll(event?: CanArray): void { + this.removeAllListeners(event); + } + + /** + * + */ + getEvents(): EmitterEvent[] { + return this.eventNames().map((event) => { + if (Array.isArray(event)) { + return String(event).replaceAll(',', '.'); + } + + return String(event); + }); + } +} + +/** + * + */ +export default function factory(options?: ConstructorOptions): Engine { + return new Engine(options); +} diff --git a/src/core/event-emitter/index.ts b/src/core/event-emitter/index.ts new file mode 100644 index 000000000..b3273ac9b --- /dev/null +++ b/src/core/event-emitter/index.ts @@ -0,0 +1,91 @@ +import type { + + EmitterEngine, + EmitterEngineFactory, + + EmitterOptions, + EmitterEvent, + + EventHandler, + HandlerParameters + +} from 'core/event-emitter/interface'; + +import { defaultOptions } from 'core/event-emitter/const'; + +import Stream from 'core/event-emitter/modules/stream'; + +export * from 'core/event-emitter/interface'; + +/** + * + */ +export default class EventEmitter { + /** + * + */ + protected readonly options: Required; + + /** + * + */ + protected readonly engine: EmitterEngine; + + constructor(options: EmitterOptions = {}) { + this.options = Object.mixin(true, defaultOptions, options); + this.engine = this.options.engine(this.options.engineOptions); + } + + /** + * + */ + on(events: CanArray, handler: EventHandler): void { + this.normalizeEvents(events).forEach((event) => this.engine.on(event, handler)); + } + + /** + * + */ + stream(events: CanArray): AsyncIterableIterator { + return new Stream(this, this.normalizeEvents(events)); + } + + /** + * + */ + off(events?: CanArray, handler?: EventHandler): void { + if (events == null) { + this.engine.getEvents().forEach((event) => this.emit(`off.${event}`)); + + this.engine.offAll(); + + return; + } + + for (const event of this.normalizeEvents(events)) { + this.emit(`off.${event}`); + + if (handler == null) { + this.engine.offAll(event); + } else { + this.engine.off(event, handler); + } + } + } + + /** + * + */ + emit(events: CanArray, data?: unknown): void { + for (const event of this.normalizeEvents(events)) { + this.engine.emit(event, {data, event}); + } + } + + /** + * + */ + protected normalizeEvents(event: CanArray): EmitterEvent[] { + return Array.isArray(event) ? event : [event]; + } +} diff --git a/src/core/event-emitter/interface.ts b/src/core/event-emitter/interface.ts new file mode 100644 index 000000000..ef5d1946f --- /dev/null +++ b/src/core/event-emitter/interface.ts @@ -0,0 +1,81 @@ +import type defaultEngine from 'core/event-emitter/engines/default'; + +/** + * + */ +export interface EmitterOptions { + /** + * + */ + engine?: T; + + /** + * + */ + engineOptions?: InferFactoryParameters; +} + +/** + * + */ +export type EmitterEngineFactory = (params?: Dictionary) => EmitterEngine; + +/** + * + */ +export interface EmitterEngine { + /** + * + */ + on(event: EmitterEvent, handler: AnyFunction): void; + + /** + * + */ + off(event: EmitterEvent, handler: AnyFunction): void; + + /** + * + */ + emit(event: EmitterEvent, data?: unknown): void; + + /** + * + */ + offAll(event?: EmitterEvent): void; + + /** + * + */ + getEvents(): EmitterEvent[]; +} + +/** + * + */ +export type EmitterEvent = string; + +/** + * + */ +export type EventHandler = (params: HandlerParameters) => void; + +/** + * + */ +export interface HandlerParameters { + /** + * + */ + data?: unknown; + + /** + * + */ + event: EmitterEvent; +} + +/** + * + */ +type InferFactoryParameters = Parameters[0]; diff --git a/src/core/event-emitter/modules/stream.ts b/src/core/event-emitter/modules/stream.ts new file mode 100644 index 000000000..601f041f3 --- /dev/null +++ b/src/core/event-emitter/modules/stream.ts @@ -0,0 +1,119 @@ +import type EventEmitter from 'core/event-emitter'; + +import type { HandlerParameters, EmitterEvent, EventHandler } from 'core/event-emitter/interface'; + +/** + * + */ +export default class Stream implements AsyncIterableIterator { + /** + * + */ + protected readonly emitter: EventEmitter; + + /** + * + */ + protected readonly events: Set; + + /** + * + */ + protected resolvePromise: Nullable<(params: IteratorResult) => void> = null; + + /** + * + */ + protected pendingPromise: Nullable>> = null; + + /** + * + */ + protected isDone: boolean = false; + + /** + * + */ + protected handlers: Map = new Map(); + + constructor(emitter: EventEmitter, events: EmitterEvent[]) { + this.emitter = emitter; + this.events = new Set(events); + + for (const event of events) { + const handler: EventHandler = (params) => { + this.resolvePromise?.({done: false, value: params}); + + this.resolvePromise = null; + this.pendingPromise = null; + }; + + this.handlers.set(event, handler); + + this.emitter.on(event, handler); + + this.emitter.on(`off.${event}`, () => { + this.events.delete(event); + this.emitter.off(event, handler); + + if (this.events.size === 0) { + void this.return(); + } + }); + } + } + + /** + * + */ + [Symbol.asyncIterator](): AsyncIterableIterator { + return this; + } + + /** + * + */ + next(): Promise> { + if (this.isDone) { + return this.return(); + } + + return this.pendingPromise ??= new Promise((resolve) => this.resolvePromise = resolve); + } + + /** + * + */ + return(): Promise> { + const chunk: IteratorReturnResult = { + done: true, + value: undefined + }; + + if (this.isDone) { + return Promise.resolve(chunk); + } + + this.resolvePromise?.(chunk); + this.offAllHandlers(); + + this.resolvePromise = null; + this.pendingPromise = null; + this.isDone = true; + + return Promise.resolve(chunk); + } + + /** + * + */ + throw(): any { + return null; + } + + protected offAllHandlers(): void { + for (const [event, handler] of this.handlers) { + this.emitter.off(event, handler); + } + } +} diff --git a/src/core/event-emitter/spec.js b/src/core/event-emitter/spec.js new file mode 100644 index 000000000..094fd98f1 --- /dev/null +++ b/src/core/event-emitter/spec.js @@ -0,0 +1,3 @@ +import EventEmitter from 'core/event-emitter'; + +import defaultEngine from 'core/event-emitter/engines/default'; From cd36566a1e7b93728bcf8a358fa459ae4f9b6cda Mon Sep 17 00:00:00 2001 From: geoprv Date: Fri, 24 Mar 2023 07:43:05 +0300 Subject: [PATCH 02/10] refactor: added async semaphore in Stream --- src/core/event-emitter/modules/stream.ts | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/core/event-emitter/modules/stream.ts b/src/core/event-emitter/modules/stream.ts index 601f041f3..562488282 100644 --- a/src/core/event-emitter/modules/stream.ts +++ b/src/core/event-emitter/modules/stream.ts @@ -2,6 +2,8 @@ import type EventEmitter from 'core/event-emitter'; import type { HandlerParameters, EmitterEvent, EventHandler } from 'core/event-emitter/interface'; +import { createsAsyncSemaphore } from 'core/event'; + /** * */ @@ -40,25 +42,24 @@ export default class Stream implements AsyncIterableIterator this.emitter = emitter; this.events = new Set(events); - for (const event of events) { - const handler: EventHandler = (params) => { - this.resolvePromise?.({done: false, value: params}); + const + semaphore = createsAsyncSemaphore(this.return.bind(this), ...this.events); + + for (const event of this.events) { + const resolveCurrentPromise: EventHandler = (value) => { + this.resolvePromise?.({done: false, value}); this.resolvePromise = null; this.pendingPromise = null; }; - this.handlers.set(event, handler); + this.handlers.set(event, resolveCurrentPromise); - this.emitter.on(event, handler); + this.emitter.on(event, resolveCurrentPromise); this.emitter.on(`off.${event}`, () => { - this.events.delete(event); - this.emitter.off(event, handler); - - if (this.events.size === 0) { - void this.return(); - } + this.emitter.off(event, resolveCurrentPromise); + semaphore(event); }); } } From 03f8d894815a4ebfb1776813ccd8074dbab4cf86 Mon Sep 17 00:00:00 2001 From: geoprv Date: Fri, 24 Mar 2023 10:13:52 +0300 Subject: [PATCH 03/10] added 'once' method --- src/core/event-emitter/index.ts | 46 +++++++++++++++++++++--- src/core/event-emitter/modules/stream.ts | 2 +- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/src/core/event-emitter/index.ts b/src/core/event-emitter/index.ts index b3273ac9b..2392f8e41 100644 --- a/src/core/event-emitter/index.ts +++ b/src/core/event-emitter/index.ts @@ -39,15 +39,52 @@ export default class EventEmitter, handler: EventHandler): void { - this.normalizeEvents(events).forEach((event) => this.engine.on(event, handler)); + on(events: CanArray, handler: EventHandler): void; + + /** + * + */ + on(events: CanArray): AsyncIterableIterator; + + on(events: CanArray, handler?: EventHandler): CanVoid> { + events = this.normalizeEvents(events); + + if (handler == null) { + return new Stream(this, events); + } + + events.forEach((event) => this.engine.on(event, handler)); } /** * */ - stream(events: CanArray): AsyncIterableIterator { - return new Stream(this, this.normalizeEvents(events)); + once(events: CanArray, handler: EventHandler): void; + + /** + * + */ + once(events: CanArray): AsyncIterableIterator; + + /** + * + */ + once(events: CanArray, handler?: EventHandler): CanVoid> { + events = this.normalizeEvents(events); + + for (const event of events) { + const wrapper: EventHandler = (params) => { + handler?.(params); + + this.off(event, wrapper); + }; + + this.on(event, wrapper); + } + + if (handler == null) { + return new Stream(this, events); + } } /** @@ -89,3 +126,4 @@ export default class EventEmitter this.emitter.on(event, resolveCurrentPromise); - this.emitter.on(`off.${event}`, () => { + this.emitter.once(`off.${event}`, () => { this.emitter.off(event, resolveCurrentPromise); semaphore(event); }); From cf69dec657b2cdb01cf04166c17fa6cab1003bda Mon Sep 17 00:00:00 2001 From: geoprv Date: Sat, 25 Mar 2023 07:39:16 +0300 Subject: [PATCH 04/10] added 'promisify' method --- src/core/event-emitter/index.ts | 42 ++++++++++++++++------- src/core/event-emitter/interface.ts | 23 +++++++++---- src/core/event-emitter/modules/promise.ts | 26 ++++++++++++++ src/core/event-emitter/modules/stream.ts | 16 ++++----- src/core/event-emitter/spec.js | 3 -- src/core/event-emitter/spec.ts | 15 ++++++++ 6 files changed, 95 insertions(+), 30 deletions(-) create mode 100644 src/core/event-emitter/modules/promise.ts delete mode 100644 src/core/event-emitter/spec.js create mode 100644 src/core/event-emitter/spec.ts diff --git a/src/core/event-emitter/index.ts b/src/core/event-emitter/index.ts index 2392f8e41..20563ce6d 100644 --- a/src/core/event-emitter/index.ts +++ b/src/core/event-emitter/index.ts @@ -7,13 +7,17 @@ import type { EmitterEvent, EventHandler, - HandlerParameters + HandlerValues, + HandlerParameters, + + OffOptions } from 'core/event-emitter/interface'; import { defaultOptions } from 'core/event-emitter/const'; import Stream from 'core/event-emitter/modules/stream'; +import createEmitterPromise from 'core/event-emitter/modules/promise'; export * from 'core/event-emitter/interface'; @@ -44,9 +48,9 @@ export default class EventEmitter): AsyncIterableIterator; + on(events: CanArray): AsyncIterableIterator; - on(events: CanArray, handler?: EventHandler): CanVoid> { + on(events: CanArray, handler?: EventHandler): CanVoid> { events = this.normalizeEvents(events); if (handler == null) { @@ -56,6 +60,13 @@ export default class EventEmitter this.engine.on(event, handler)); } + /** + * + */ + promifisy(events: CanArray): Promise { + return createEmitterPromise(this, this.normalizeEvents(events)); + } + /** * */ @@ -64,17 +75,17 @@ export default class EventEmitter): AsyncIterableIterator; + once(events: CanArray): AsyncIterableIterator; /** * */ - once(events: CanArray, handler?: EventHandler): CanVoid> { + once(events: CanArray, handler?: EventHandler): CanVoid> { events = this.normalizeEvents(events); for (const event of events) { - const wrapper: EventHandler = (params) => { - handler?.(params); + const wrapper: EventHandler = (...params) => { + handler?.(...params); this.off(event, wrapper); }; @@ -90,9 +101,14 @@ export default class EventEmitter, handler?: EventHandler): void { + off(events?: CanArray, handler?: EventHandler, options?: OffOptions): void { + const + {emit = true} = options ?? {}; + if (events == null) { - this.engine.getEvents().forEach((event) => this.emit(`off.${event}`)); + if (emit) { + this.engine.getEvents().forEach((event) => this.emit(`off.${event}`)); + } this.engine.offAll(); @@ -100,7 +116,9 @@ export default class EventEmitter, data?: unknown): void { + emit(events: CanArray, ...params: HandlerValues): void { for (const event of this.normalizeEvents(events)) { - this.engine.emit(event, {data, event}); + this.engine.emit(event, ...params, {event}); } } diff --git a/src/core/event-emitter/interface.ts b/src/core/event-emitter/interface.ts index ef5d1946f..d8bbab0b8 100644 --- a/src/core/event-emitter/interface.ts +++ b/src/core/event-emitter/interface.ts @@ -37,7 +37,7 @@ export interface EmitterEngine { /** * */ - emit(event: EmitterEvent, data?: unknown): void; + emit(event: EmitterEvent, ...params: unknown[]): void; /** * @@ -58,24 +58,33 @@ export type EmitterEvent = string; /** * */ -export type EventHandler = (params: HandlerParameters) => void; +export type EventHandler = (...params: HandlerParameters) => void; /** * */ -export interface HandlerParameters { - /** - * - */ - data?: unknown; +export type HandlerParameters = [...values: HandlerValues, meta: HandlerMeta]; +/** + * + */ +export interface HandlerMeta { /** * */ event: EmitterEvent; } +/** + * + */ +export type HandlerValues = unknown[]; + /** * */ type InferFactoryParameters = Parameters[0]; + +export interface OffOptions { + emit?: boolean; +} diff --git a/src/core/event-emitter/modules/promise.ts b/src/core/event-emitter/modules/promise.ts new file mode 100644 index 000000000..be47371c3 --- /dev/null +++ b/src/core/event-emitter/modules/promise.ts @@ -0,0 +1,26 @@ +import type EventEmitter from 'core/event-emitter'; +import type { HandlerParameters, EmitterEvent } from 'core/event-emitter/interface'; + +import { createsAsyncSemaphore } from 'core/event'; + +export default function createEmitterPromise( + emitter: EventEmitter, + events: EmitterEvent[] +): Promise { + return new Promise((resolve, reject) => { + const + handler = (...params: HandlerParameters) => resolve(params); + + emitter.once(events, handler); + + const + semaphore = createsAsyncSemaphore(reject, ...events); + + for (const event of events) { + emitter.once(`off.${event}`, () => { + emitter.off(event, handler, {emit: false}); + semaphore(event); + }); + } + }); +} diff --git a/src/core/event-emitter/modules/stream.ts b/src/core/event-emitter/modules/stream.ts index 62f15c64d..439aa4243 100644 --- a/src/core/event-emitter/modules/stream.ts +++ b/src/core/event-emitter/modules/stream.ts @@ -1,13 +1,13 @@ import type EventEmitter from 'core/event-emitter'; -import type { HandlerParameters, EmitterEvent, EventHandler } from 'core/event-emitter/interface'; +import type { HandlerValues, EmitterEvent, EventHandler } from 'core/event-emitter/interface'; import { createsAsyncSemaphore } from 'core/event'; /** * */ -export default class Stream implements AsyncIterableIterator { +export default class Stream implements AsyncIterableIterator { /** * */ @@ -21,12 +21,12 @@ export default class Stream implements AsyncIterableIterator /** * */ - protected resolvePromise: Nullable<(params: IteratorResult) => void> = null; + protected resolvePromise: Nullable<(params: IteratorResult) => void> = null; /** * */ - protected pendingPromise: Nullable>> = null; + protected pendingPromise: Nullable>> = null; /** * @@ -46,8 +46,8 @@ export default class Stream implements AsyncIterableIterator semaphore = createsAsyncSemaphore(this.return.bind(this), ...this.events); for (const event of this.events) { - const resolveCurrentPromise: EventHandler = (value) => { - this.resolvePromise?.({done: false, value}); + const resolveCurrentPromise: EventHandler = (...params) => { + this.resolvePromise?.({done: false, value: params}); this.resolvePromise = null; this.pendingPromise = null; @@ -67,14 +67,14 @@ export default class Stream implements AsyncIterableIterator /** * */ - [Symbol.asyncIterator](): AsyncIterableIterator { + [Symbol.asyncIterator](): AsyncIterableIterator { return this; } /** * */ - next(): Promise> { + next(): Promise> { if (this.isDone) { return this.return(); } diff --git a/src/core/event-emitter/spec.js b/src/core/event-emitter/spec.js deleted file mode 100644 index 094fd98f1..000000000 --- a/src/core/event-emitter/spec.js +++ /dev/null @@ -1,3 +0,0 @@ -import EventEmitter from 'core/event-emitter'; - -import defaultEngine from 'core/event-emitter/engines/default'; diff --git a/src/core/event-emitter/spec.ts b/src/core/event-emitter/spec.ts new file mode 100644 index 000000000..84400b735 --- /dev/null +++ b/src/core/event-emitter/spec.ts @@ -0,0 +1,15 @@ +import EventEmitter from 'core/event-emitter'; + +describe('core/event-emitter', () => { + it.only('foo', async () => { + const + ee = new EventEmitter(), + promise = ee.promifisy('foo'); + + ee.emit('foo', 21, {data: 'bla'}); + + const results = await promise; + + expect(results).toEqual([21, {data: 'bla'}, {event: 'foo'}]); + }); +}); From 172f4cde2a0f7537a8c5a46a01c5aee87abb01c0 Mon Sep 17 00:00:00 2001 From: geoprv Date: Sun, 26 Mar 2023 07:31:08 +0300 Subject: [PATCH 05/10] added tests WIP --- index.d.ts | 1 + src/core/event-emitter/engines/default.ts | 7 ++ src/core/event-emitter/helpers.ts | 8 ++ src/core/event-emitter/index.ts | 78 +++++++-------- src/core/event-emitter/interface.ts | 28 ++---- src/core/event-emitter/modules/promise.ts | 26 ----- src/core/event-emitter/modules/stream.ts | 95 ++++++++++++------ src/core/event-emitter/spec.ts | 112 +++++++++++++++++++++- 8 files changed, 229 insertions(+), 126 deletions(-) create mode 100644 src/core/event-emitter/helpers.ts delete mode 100644 src/core/event-emitter/modules/promise.ts diff --git a/index.d.ts b/index.d.ts index 5e19846aa..17c54408b 100644 --- a/index.d.ts +++ b/index.d.ts @@ -171,6 +171,7 @@ type JSONLikeValue = type CanPromise = T | Promise; type CanPromiseLike = T | PromiseLike; type CanArray = T | T[]; +type CanIterable = T | Iterable; type CanUndef = T | undefined; type Nullable = T | null | undefined; diff --git a/src/core/event-emitter/engines/default.ts b/src/core/event-emitter/engines/default.ts index 8977b1518..534d0622d 100644 --- a/src/core/event-emitter/engines/default.ts +++ b/src/core/event-emitter/engines/default.ts @@ -27,6 +27,13 @@ class Engine extends EventEmitter2 implements EmitterEngine { return String(event); }); } + + /** + * + */ + override hasListeners(event: string): boolean { + return super.hasListeners(event); + } } /** diff --git a/src/core/event-emitter/helpers.ts b/src/core/event-emitter/helpers.ts new file mode 100644 index 000000000..537a318fd --- /dev/null +++ b/src/core/event-emitter/helpers.ts @@ -0,0 +1,8 @@ +import type { EmitterEvent } from 'core/event-emitter/interface'; + +/** + * + */ +export function isEmitterEvent(event: unknown): event is EmitterEvent { + return Object.isString(event); +} diff --git a/src/core/event-emitter/index.ts b/src/core/event-emitter/index.ts index 20563ce6d..46bb1e464 100644 --- a/src/core/event-emitter/index.ts +++ b/src/core/event-emitter/index.ts @@ -1,3 +1,9 @@ +import { isEmitterEvent } from 'core/event-emitter/helpers'; + +import { defaultOptions } from 'core/event-emitter/const'; + +import Stream from 'core/event-emitter/modules/stream'; + import type { EmitterEngine, @@ -7,18 +13,10 @@ import type { EmitterEvent, EventHandler, - HandlerValues, - HandlerParameters, - - OffOptions + HandlerValues } from 'core/event-emitter/interface'; -import { defaultOptions } from 'core/event-emitter/const'; - -import Stream from 'core/event-emitter/modules/stream'; -import createEmitterPromise from 'core/event-emitter/modules/promise'; - export * from 'core/event-emitter/interface'; /** @@ -43,47 +41,42 @@ export default class EventEmitter, handler: EventHandler): void; + on(events: CanIterable, handler: EventHandler): void; /** * */ - on(events: CanArray): AsyncIterableIterator; + on(events: CanIterable): AsyncIterableIterator; - on(events: CanArray, handler?: EventHandler): CanVoid> { - events = this.normalizeEvents(events); + on(events: CanIterable, handler?: EventHandler): CanVoid> { + const + eventsArr = this.normalizeEvents(events); if (handler == null) { - return new Stream(this, events); + return new Stream(this, eventsArr); } - events.forEach((event) => this.engine.on(event, handler)); + eventsArr.forEach((event) => this.engine.on(event, handler)); } /** * */ - promifisy(events: CanArray): Promise { - return createEmitterPromise(this, this.normalizeEvents(events)); - } + once(events: CanIterable, handler: EventHandler): void; /** * */ - once(events: CanArray, handler: EventHandler): void; + once(events: CanIterable): AsyncIterableIterator; /** * */ - once(events: CanArray): AsyncIterableIterator; - - /** - * - */ - once(events: CanArray, handler?: EventHandler): CanVoid> { - events = this.normalizeEvents(events); + once(events: CanIterable, handler?: EventHandler): CanVoid> { + const + eventsArr = this.normalizeEvents(events); - for (const event of events) { + for (const event of eventsArr) { const wrapper: EventHandler = (...params) => { handler?.(...params); @@ -94,21 +87,18 @@ export default class EventEmitter, handler?: EventHandler, options?: OffOptions): void { - const - {emit = true} = options ?? {}; + off(events?: CanIterable, handler?: EventHandler): void { + const emitOff = (e: string) => this.emit(`off.${e}`); if (events == null) { - if (emit) { - this.engine.getEvents().forEach((event) => this.emit(`off.${event}`)); - } + this.engine.getEvents().forEach(emitOff); this.engine.offAll(); @@ -116,32 +106,34 @@ export default class EventEmitter, ...params: HandlerValues): void { + emit(events: CanIterable, ...values: HandlerValues): void { for (const event of this.normalizeEvents(events)) { - this.engine.emit(event, ...params, {event}); + this.engine.emit(event, ...values); } } /** * */ - protected normalizeEvents(event: CanArray): EmitterEvent[] { - return Array.isArray(event) ? event : [event]; + protected normalizeEvents(event: CanIterable): EmitterEvent[] { + return isEmitterEvent(event) ? [event] : [...event]; } } - diff --git a/src/core/event-emitter/interface.ts b/src/core/event-emitter/interface.ts index d8bbab0b8..df7da464f 100644 --- a/src/core/event-emitter/interface.ts +++ b/src/core/event-emitter/interface.ts @@ -37,7 +37,7 @@ export interface EmitterEngine { /** * */ - emit(event: EmitterEvent, ...params: unknown[]): void; + emit(event: EmitterEvent, ...values: unknown[]): void; /** * @@ -48,6 +48,11 @@ export interface EmitterEngine { * */ getEvents(): EmitterEvent[]; + + /** + * + */ + hasListeners(event: EmitterEvent): boolean; } /** @@ -58,22 +63,7 @@ export type EmitterEvent = string; /** * */ -export type EventHandler = (...params: HandlerParameters) => void; - -/** - * - */ -export type HandlerParameters = [...values: HandlerValues, meta: HandlerMeta]; - -/** - * - */ -export interface HandlerMeta { - /** - * - */ - event: EmitterEvent; -} +export type EventHandler = (...values: HandlerValues) => void; /** * @@ -84,7 +74,3 @@ export type HandlerValues = unknown[]; * */ type InferFactoryParameters = Parameters[0]; - -export interface OffOptions { - emit?: boolean; -} diff --git a/src/core/event-emitter/modules/promise.ts b/src/core/event-emitter/modules/promise.ts deleted file mode 100644 index be47371c3..000000000 --- a/src/core/event-emitter/modules/promise.ts +++ /dev/null @@ -1,26 +0,0 @@ -import type EventEmitter from 'core/event-emitter'; -import type { HandlerParameters, EmitterEvent } from 'core/event-emitter/interface'; - -import { createsAsyncSemaphore } from 'core/event'; - -export default function createEmitterPromise( - emitter: EventEmitter, - events: EmitterEvent[] -): Promise { - return new Promise((resolve, reject) => { - const - handler = (...params: HandlerParameters) => resolve(params); - - emitter.once(events, handler); - - const - semaphore = createsAsyncSemaphore(reject, ...events); - - for (const event of events) { - emitter.once(`off.${event}`, () => { - emitter.off(event, handler, {emit: false}); - semaphore(event); - }); - } - }); -} diff --git a/src/core/event-emitter/modules/stream.ts b/src/core/event-emitter/modules/stream.ts index 439aa4243..4d1f185a6 100644 --- a/src/core/event-emitter/modules/stream.ts +++ b/src/core/event-emitter/modules/stream.ts @@ -1,9 +1,18 @@ import type EventEmitter from 'core/event-emitter'; -import type { HandlerValues, EmitterEvent, EventHandler } from 'core/event-emitter/interface'; +import type { + + HandlerValues, + + EmitterEvent, + EventHandler + +} from 'core/event-emitter/interface'; import { createsAsyncSemaphore } from 'core/event'; +import { Queue } from 'core/queue'; + /** * */ @@ -16,12 +25,17 @@ export default class Stream implements AsyncIterableIterator { /** * */ - protected readonly events: Set; + protected readonly events: EmitterEvent[]; + + /** + * + */ + protected readonly queue: Queue = new Queue(); /** * */ - protected resolvePromise: Nullable<(params: IteratorResult) => void> = null; + protected resolvePromise: Nullable<(value: IteratorResult) => void> = null; /** * @@ -36,31 +50,26 @@ export default class Stream implements AsyncIterableIterator { /** * */ - protected handlers: Map = new Map(); + protected returnAfterEmptyQueue: boolean = false; constructor(emitter: EventEmitter, events: EmitterEvent[]) { this.emitter = emitter; - this.events = new Set(events); + this.events = events; + + const terminateStream = () => { + if (this.queue.length > 0) { + this.returnAfterEmptyQueue = true; + } else { + void this.return(); + } + }; const - semaphore = createsAsyncSemaphore(this.return.bind(this), ...this.events); + semaphore = createsAsyncSemaphore(terminateStream, ...this.events); for (const event of this.events) { - const resolveCurrentPromise: EventHandler = (...params) => { - this.resolvePromise?.({done: false, value: params}); - - this.resolvePromise = null; - this.pendingPromise = null; - }; - - this.handlers.set(event, resolveCurrentPromise); - - this.emitter.on(event, resolveCurrentPromise); - - this.emitter.once(`off.${event}`, () => { - this.emitter.off(event, resolveCurrentPromise); - semaphore(event); - }); + this.emitter.on(event, this.onEvent); + this.emitter.once(`off.${event}`, () => semaphore(event)); } } @@ -79,7 +88,27 @@ export default class Stream implements AsyncIterableIterator { return this.return(); } - return this.pendingPromise ??= new Promise((resolve) => this.resolvePromise = resolve); + if (this.queue.length > 0) { + const value = this.queue.shift()!; + return Promise.resolve({done: false, value}); + } + + if (this.returnAfterEmptyQueue) { + return this.return(); + } + + return this.pendingPromise ??= createPromise.call(this); + + function createPromise(this: Stream): Promise> { + return new Promise((resolve) => { + this.resolvePromise = (chunk) => { + resolve(chunk); + + this.resolvePromise = null; + this.pendingPromise = null; + }; + }); + } } /** @@ -96,10 +125,9 @@ export default class Stream implements AsyncIterableIterator { } this.resolvePromise?.(chunk); - this.offAllHandlers(); + this.offAllListeners(); + this.queue.clear(); - this.resolvePromise = null; - this.pendingPromise = null; this.isDone = true; return Promise.resolve(chunk); @@ -108,13 +136,18 @@ export default class Stream implements AsyncIterableIterator { /** * */ - throw(): any { - return null; + protected offAllListeners(): void { + this.emitter.off(this.events, this.onEvent); } - protected offAllHandlers(): void { - for (const [event, handler] of this.handlers) { - this.emitter.off(event, handler); + /** + * + */ + protected onEvent: EventHandler = (...value) => { + if (this.pendingPromise == null) { + this.queue.push(value); + } else { + this.resolvePromise?.({done: false, value}); } - } + }; } diff --git a/src/core/event-emitter/spec.ts b/src/core/event-emitter/spec.ts index 84400b735..866064c57 100644 --- a/src/core/event-emitter/spec.ts +++ b/src/core/event-emitter/spec.ts @@ -1,15 +1,117 @@ import EventEmitter from 'core/event-emitter'; describe('core/event-emitter', () => { - it.only('foo', async () => { + describe('executes a listener and recieve the data', () => { + it('every time an event is emitted', () => { + const + ee = new EventEmitter(), + listener = jest.fn(); + + const + recievedValues: unknown[] = []; + + ee.on('foo', (...values) => { + listener(); + recievedValues.push(...values); + }); + + ee.emit('foo', 1); + ee.emit('foo', 2, 3); + + expect(listener).toBeCalledTimes(2); + expect(recievedValues).toEqual([1, 2, 3]); + }); + + it('only once after emitting an event', () => { + const + ee = new EventEmitter(), + listener = jest.fn(); + + const + recievedValues: unknown[] = []; + + ee.once('foo', (...values) => { + listener(); + recievedValues.push(...values); + }); + + ee.emit('foo', 1); + ee.emit('foo', 2, 3); + + expect(listener).toBeCalledTimes(1); + expect(recievedValues).toEqual([1]); + }); + }); + + it('should not execute a listener after removing', () => { const ee = new EventEmitter(), - promise = ee.promifisy('foo'); + listener = jest.fn(); + + ee.on('foo', listener); + + ee.off('foo', listener); + + ee.emit('foo'); + + expect(listener).not.toBeCalled(); + }); + + describe('provides a "stream" interface via AsyncIterableIterator', () => { + // TODO: description + it('works asynchronously', async () => { + const + ee = new EventEmitter(); + + const + listener = jest.fn(), + recievedValues: unknown[] = [], + stream = wrapper(); + + ee.emit('foo', 1); + + queueMicrotask(() => ee.emit('foo', 2, 3)); + + queueMicrotask(() => ee.off('foo')); + + await stream; + + expect(listener).toBeCalledTimes(2); + expect(recievedValues).toEqual([1, 2, 3]); + + async function wrapper(): Promise { + for await (const values of ee.on('foo')) { + recievedValues.push(...values); + listener(); + } + } + }); + + it('works synchronously', async () => { + const + ee = new EventEmitter(); + + const + listener = jest.fn(), + recievedValues: unknown[] = [], + stream = wrapper(); + + ee.emit('foo', 1); + ee.emit('foo', 2, 3); + + ee.off('foo'); - ee.emit('foo', 21, {data: 'bla'}); + await stream; - const results = await promise; + expect(listener).toBeCalledTimes(2); + expect(recievedValues).toEqual([1, 2, 3]); - expect(results).toEqual([21, {data: 'bla'}, {event: 'foo'}]); + async function wrapper(): Promise { + for await (const values of ee.on('foo')) { + recievedValues.push(...values); + listener(); + } + } + }); }); }); From 9e86f8d7ef48221784de40920cb6a94f85f34ee8 Mon Sep 17 00:00:00 2001 From: geoprv Date: Mon, 27 Mar 2023 17:06:30 +0300 Subject: [PATCH 06/10] test: better tests --- src/core/event-emitter/spec.ts | 181 ++++++++++++++++++--------------- 1 file changed, 100 insertions(+), 81 deletions(-) diff --git a/src/core/event-emitter/spec.ts b/src/core/event-emitter/spec.ts index 866064c57..c36e9875b 100644 --- a/src/core/event-emitter/spec.ts +++ b/src/core/event-emitter/spec.ts @@ -1,117 +1,136 @@ import EventEmitter from 'core/event-emitter'; describe('core/event-emitter', () => { - describe('executes a listener and recieve the data', () => { - it('every time an event is emitted', () => { - const - ee = new EventEmitter(), - listener = jest.fn(); - - const - recievedValues: unknown[] = []; - - ee.on('foo', (...values) => { - listener(); - recievedValues.push(...values); - }); + describe('subscribes to an event and recieves the emitted data', () => { + describe('subscribing by a callback', () => { + it('until the callback is unsubscribed explicitly', () => { + const + emitter = new EventEmitter(), + e = 'event'; + + const + listener = jest.fn(), + recievedValues: unknown[] = []; + + emitter.on(e, (...values) => { + recievedValues.push(...values); + listener(); + }); - ee.emit('foo', 1); - ee.emit('foo', 2, 3); + emitter.emit(e, 1); + emitter.emit(e, 2, 3); - expect(listener).toBeCalledTimes(2); - expect(recievedValues).toEqual([1, 2, 3]); - }); + expect(listener).toBeCalledTimes(2); + expect(recievedValues).toEqual([1, 2, 3]); - it('only once after emitting an event', () => { - const - ee = new EventEmitter(), - listener = jest.fn(); + emitter.off(e); - const - recievedValues: unknown[] = []; + emitter.emit(e, 4, 5); - ee.once('foo', (...values) => { - listener(); - recievedValues.push(...values); + expect(listener).toBeCalledTimes(2); + expect(recievedValues).toEqual([1, 2, 3]); }); - ee.emit('foo', 1); - ee.emit('foo', 2, 3); + it('until the event is emitted only once', () => { + const + emitter = new EventEmitter(), + e = 'event'; + + const + listener = jest.fn(), + recievedValues: unknown[] = []; + + emitter.once(e, (...values) => { + recievedValues.push(...values); + listener(); + }); - expect(listener).toBeCalledTimes(1); - expect(recievedValues).toEqual([1]); + emitter.emit(e, 1); + emitter.emit(e, 2, 3); + + expect(listener).toBeCalledTimes(1); + expect(recievedValues).toEqual([1]); + }); }); - }); - it('should not execute a listener after removing', () => { - const - ee = new EventEmitter(), - listener = jest.fn(); + describe('subscribing via stream', () => { + it('until all listeners to the event are unsubscribed', async () => { + const + emitter = new EventEmitter(), + e = 'event'; - ee.on('foo', listener); + const + stream = createStream(), + listener = jest.fn(), + recievedValues: unknown[] = []; - ee.off('foo', listener); + emitter.emit(e, 1); + emitter.emit(e, 2); - ee.emit('foo'); + queueMicrotask(() => emitter.emit(e, 3)); + queueMicrotask(() => emitter.off(e)); - expect(listener).not.toBeCalled(); - }); + await stream; - describe('provides a "stream" interface via AsyncIterableIterator', () => { - // TODO: description - it('works asynchronously', async () => { - const - ee = new EventEmitter(); + expect(recievedValues).toEqual([1, 2, 3]); + expect(listener).toBeCalledTimes(3); - const - listener = jest.fn(), - recievedValues: unknown[] = [], - stream = wrapper(); + async function createStream(): Promise { + for await (const values of emitter.on(e)) { + recievedValues.push(...values); + listener(); + } + } + }); - ee.emit('foo', 1); + it('until the event is emitted only once', async () => { + const + emitter = new EventEmitter(), + e = 'event'; - queueMicrotask(() => ee.emit('foo', 2, 3)); + const + stream = createStream(), + listener = jest.fn(), + recievedValues: unknown[] = []; - queueMicrotask(() => ee.off('foo')); + emitter.emit(e, 1); + emitter.emit(e, 2); - await stream; + queueMicrotask(() => emitter.emit(e, 3)); + queueMicrotask(() => emitter.off(e)); - expect(listener).toBeCalledTimes(2); - expect(recievedValues).toEqual([1, 2, 3]); + await stream; - async function wrapper(): Promise { - for await (const values of ee.on('foo')) { - recievedValues.push(...values); - listener(); + expect(recievedValues).toEqual([1]); + expect(listener).toBeCalledTimes(1); + + async function createStream(): Promise { + for await (const values of emitter.once(e)) { + recievedValues.push(...values); + listener(); + } } - } + }); }); + }); - it('works synchronously', async () => { - const - ee = new EventEmitter(); - - const - listener = jest.fn(), - recievedValues: unknown[] = [], - stream = wrapper(); + it.only('foo', async () => { + const + ee = new EventEmitter(), + stream = createStream(); - ee.emit('foo', 1); - ee.emit('foo', 2, 3); + ee.emit('foo', 'foo'); - ee.off('foo'); + queueMicrotask(() => ee.emit('bar', 'bar')); - await stream; + await stream; - expect(listener).toBeCalledTimes(2); - expect(recievedValues).toEqual([1, 2, 3]); + console.log('yoo'); - async function wrapper(): Promise { - for await (const values of ee.on('foo')) { - recievedValues.push(...values); - listener(); - } + async function createStream(): Promise { + for await (const data of ee.once(['foo', 'bar'])) { + console.log(data); } - }); + } }); }); From c05385175ff7393de65c82300a143a98f25bddb7 Mon Sep 17 00:00:00 2001 From: geoprv Date: Mon, 27 Mar 2023 17:07:22 +0300 Subject: [PATCH 07/10] feat: added "any" method --- src/core/event-emitter/index.ts | 61 ++++++++++++++++++++++++++++++--- 1 file changed, 57 insertions(+), 4 deletions(-) diff --git a/src/core/event-emitter/index.ts b/src/core/event-emitter/index.ts index 46bb1e464..c5e23d519 100644 --- a/src/core/event-emitter/index.ts +++ b/src/core/event-emitter/index.ts @@ -4,6 +4,8 @@ import { defaultOptions } from 'core/event-emitter/const'; import Stream from 'core/event-emitter/modules/stream'; +import { createsAsyncSemaphore } from 'core/event'; + import type { EmitterEngine, @@ -76,9 +78,17 @@ export default class EventEmitter { - handler?.(...params); + if (handler == null) { + this.emit(`off.${event}`); + } else { + handler(...params); + } this.off(event, wrapper); }; @@ -86,8 +96,50 @@ export default class EventEmitter, handler: EventHandler): void; + + /** + * + */ + any(events: CanIterable): AsyncIterableIterator; + + /** + * + */ + any(events: CanIterable, handler?: EventHandler): CanVoid> { + const + eventsArr = this.normalizeEvents(events), + map = new Map(); + + const + // Creating the stream here so that it can subscribe to the "off.event" event + stream = handler == null ? new Stream(this, eventsArr) : null; + + for (const event of eventsArr) { + const wrapper: EventHandler = (...params) => { + if (handler == null) { + void stream?.return(); + } else { + handler(...params); + } + + map.forEach((handler, event) => this.off(event, handler)); + }; + + this.on(event, wrapper); + map.set(event, wrapper); + } + + if (stream != null) { + return stream; } } @@ -95,7 +147,8 @@ export default class EventEmitter, handler?: EventHandler): void { - const emitOff = (e: string) => this.emit(`off.${e}`); + const + emitOff = (e: string) => this.emit(`off.${e}`); if (events == null) { this.engine.getEvents().forEach(emitOff); From f836ad1ea66a6ee740abdeae7e60fe483ead3d70 Mon Sep 17 00:00:00 2001 From: geoprv Date: Mon, 27 Mar 2023 17:08:08 +0300 Subject: [PATCH 08/10] refactor: simplify code --- src/core/event-emitter/engines/default.ts | 4 ++-- src/core/event-emitter/modules/stream.ts | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/core/event-emitter/engines/default.ts b/src/core/event-emitter/engines/default.ts index 534d0622d..b17906a36 100644 --- a/src/core/event-emitter/engines/default.ts +++ b/src/core/event-emitter/engines/default.ts @@ -19,9 +19,9 @@ class Engine extends EventEmitter2 implements EmitterEngine { * */ getEvents(): EmitterEvent[] { - return this.eventNames().map((event) => { + return this.eventNames().flatMap((event) => { if (Array.isArray(event)) { - return String(event).replaceAll(',', '.'); + return event.map(String); } return String(event); diff --git a/src/core/event-emitter/modules/stream.ts b/src/core/event-emitter/modules/stream.ts index 4d1f185a6..881a19177 100644 --- a/src/core/event-emitter/modules/stream.ts +++ b/src/core/event-emitter/modules/stream.ts @@ -50,7 +50,7 @@ export default class Stream implements AsyncIterableIterator { /** * */ - protected returnAfterEmptyQueue: boolean = false; + protected returnAfterQueueIsEmpty: boolean = false; constructor(emitter: EventEmitter, events: EmitterEvent[]) { this.emitter = emitter; @@ -58,7 +58,7 @@ export default class Stream implements AsyncIterableIterator { const terminateStream = () => { if (this.queue.length > 0) { - this.returnAfterEmptyQueue = true; + this.returnAfterQueueIsEmpty = true; } else { void this.return(); } @@ -93,7 +93,7 @@ export default class Stream implements AsyncIterableIterator { return Promise.resolve({done: false, value}); } - if (this.returnAfterEmptyQueue) { + if (this.returnAfterQueueIsEmpty) { return this.return(); } From b7a2dcc7e1e4b36a2fa6d3554b3bf314d89f482c Mon Sep 17 00:00:00 2001 From: geoprv Date: Wed, 29 Mar 2023 07:44:27 +0300 Subject: [PATCH 09/10] fix: skip already emitted events in stream queue when emitting "once" --- src/core/event-emitter/index.ts | 72 ++++---- .../modules/{stream.ts => stream/index.ts} | 80 +++++++-- .../event-emitter/modules/stream/interface.ts | 26 +++ src/core/event-emitter/spec.ts | 156 +++++++++++------- 4 files changed, 233 insertions(+), 101 deletions(-) rename src/core/event-emitter/modules/{stream.ts => stream/index.ts} (58%) create mode 100644 src/core/event-emitter/modules/stream/interface.ts diff --git a/src/core/event-emitter/index.ts b/src/core/event-emitter/index.ts index c5e23d519..e70db436d 100644 --- a/src/core/event-emitter/index.ts +++ b/src/core/event-emitter/index.ts @@ -1,11 +1,11 @@ +import { EventEmitter2 } from 'eventemitter2'; + import { isEmitterEvent } from 'core/event-emitter/helpers'; import { defaultOptions } from 'core/event-emitter/const'; import Stream from 'core/event-emitter/modules/stream'; -import { createsAsyncSemaphore } from 'core/event'; - import type { EmitterEngine, @@ -35,6 +35,11 @@ export default class EventEmitter = {}) { this.options = Object.mixin(true, defaultOptions, options); this.engine = this.options.engine(this.options.engineOptions); @@ -55,7 +60,7 @@ export default class EventEmitter this.engine.on(event, handler)); @@ -76,16 +81,13 @@ export default class EventEmitter, handler?: EventHandler): CanVoid> { const - eventsArr = this.normalizeEvents(events); - - const - // Creating the stream here so that it can subscribe to the "off.event" event - stream = handler == null ? new Stream(this, eventsArr) : null; + eventsArr = this.normalizeEvents(events), + stream = this.getStream(handler, eventsArr); - for (const event of eventsArr) { + eventsArr.forEach((event) => { const wrapper: EventHandler = (...params) => { if (handler == null) { - this.emit(`off.${event}`); + this.localEmitter.emit(`off.${event}`, {forbid: true}); } else { handler(...params); } @@ -94,7 +96,7 @@ export default class EventEmitter, handler: EventHandler): void; + any(events: Iterable, handler: EventHandler): void; /** * */ - any(events: CanIterable): AsyncIterableIterator; + any(events: Iterable): AsyncIterableIterator; /** * */ - any(events: CanIterable, handler?: EventHandler): CanVoid> { + any(events: Iterable, handler?: EventHandler): CanVoid> { const eventsArr = this.normalizeEvents(events), - map = new Map(); - - const - // Creating the stream here so that it can subscribe to the "off.event" event - stream = handler == null ? new Stream(this, eventsArr) : null; + handlers = new Map(), + stream = this.getStream(handler, eventsArr); - for (const event of eventsArr) { + eventsArr.forEach((event) => { const wrapper: EventHandler = (...params) => { if (handler == null) { void stream?.return(); @@ -131,12 +130,12 @@ export default class EventEmitter this.off(event, handler)); + handlers.forEach((handler, event) => this.off(event, handler)); }; this.on(event, wrapper); - map.set(event, wrapper); - } + handlers.set(event, wrapper); + }); if (stream != null) { return stream; @@ -147,8 +146,9 @@ export default class EventEmitter, handler?: EventHandler): void { - const - emitOff = (e: string) => this.emit(`off.${e}`); + const emitOff = (e: string) => { + this.localEmitter.emit(`off.${e}`); + }; if (events == null) { this.engine.getEvents().forEach(emitOff); @@ -158,7 +158,7 @@ export default class EventEmitter { if (handler == null) { emitOff(event); @@ -170,23 +170,33 @@ export default class EventEmitter, ...values: HandlerValues): void { - for (const event of this.normalizeEvents(events)) { + this.normalizeEvents(events).forEach((event) => { this.engine.emit(event, ...values); - } + }); } /** * */ protected normalizeEvents(event: CanIterable): EmitterEvent[] { - return isEmitterEvent(event) ? [event] : [...event]; + return isEmitterEvent(event) ? [event] : [...new Set(event)]; + } + + /** + * + */ + protected getStream(handler: Nullable, eventsArr: EmitterEvent[]): Nullable { + if (handler != null) { + return; + } + + return new Stream(this, this.localEmitter, eventsArr); } } diff --git a/src/core/event-emitter/modules/stream.ts b/src/core/event-emitter/modules/stream/index.ts similarity index 58% rename from src/core/event-emitter/modules/stream.ts rename to src/core/event-emitter/modules/stream/index.ts index 881a19177..f9a70f36c 100644 --- a/src/core/event-emitter/modules/stream.ts +++ b/src/core/event-emitter/modules/stream/index.ts @@ -1,3 +1,12 @@ +import type { EventEmitter2 } from 'eventemitter2'; + +import type { + + QueueChunk, + LocalOptions + +} from 'core/event-emitter/modules/stream/interface'; + import type EventEmitter from 'core/event-emitter'; import type { @@ -22,6 +31,11 @@ export default class Stream implements AsyncIterableIterator { */ protected readonly emitter: EventEmitter; + /** + * + */ + protected readonly localEmitter: EventEmitter2; + /** * */ @@ -30,7 +44,17 @@ export default class Stream implements AsyncIterableIterator { /** * */ - protected readonly queue: Queue = new Queue(); + protected readonly queue: Queue = new Queue(); + + /** + * + */ + protected readonly forbiddenEvents: Set = new Set(); + + /** + * + */ + protected readonly listeners: Map = new Map(); /** * @@ -52,9 +76,10 @@ export default class Stream implements AsyncIterableIterator { */ protected returnAfterQueueIsEmpty: boolean = false; - constructor(emitter: EventEmitter, events: EmitterEvent[]) { + constructor(emitter: EventEmitter, localEmitter: EventEmitter2, events: EmitterEvent[]) { this.emitter = emitter; this.events = events; + this.localEmitter = localEmitter; const terminateStream = () => { if (this.queue.length > 0) { @@ -68,8 +93,25 @@ export default class Stream implements AsyncIterableIterator { semaphore = createsAsyncSemaphore(terminateStream, ...this.events); for (const event of this.events) { - this.emitter.on(event, this.onEvent); - this.emitter.once(`off.${event}`, () => semaphore(event)); + const handler: EventHandler = (...value) => { + if (this.pendingPromise == null) { + this.queue.push({event, value}); + } else { + this.resolvePromise?.({done: false, value}); + } + }; + + this.listeners.set(event, handler); + + this.emitter.on(event, handler); + + this.localEmitter.once(`off.${event}`, (options?: LocalOptions) => { + semaphore(event); + + if (options?.forbid) { + this.forbiddenEvents.add(event); + } + }); } } @@ -89,8 +131,12 @@ export default class Stream implements AsyncIterableIterator { } if (this.queue.length > 0) { - const value = this.queue.shift()!; - return Promise.resolve({done: false, value}); + const + chunk = this.getNextAvailableQueueChunk(); + + if (chunk != null) { + return Promise.resolve({done: false, value: chunk.value}); + } } if (this.returnAfterQueueIsEmpty) { @@ -126,7 +172,10 @@ export default class Stream implements AsyncIterableIterator { this.resolvePromise?.(chunk); this.offAllListeners(); + this.queue.clear(); + this.forbiddenEvents.clear(); + this.listeners.clear(); this.isDone = true; @@ -137,17 +186,22 @@ export default class Stream implements AsyncIterableIterator { * */ protected offAllListeners(): void { - this.emitter.off(this.events, this.onEvent); + this.listeners.forEach((listener, event) => { + this.emitter.off(event, listener); + }); } /** * */ - protected onEvent: EventHandler = (...value) => { - if (this.pendingPromise == null) { - this.queue.push(value); - } else { - this.resolvePromise?.({done: false, value}); + protected getNextAvailableQueueChunk(): Nullable { + let + chunk = this.queue.shift(); + + while (chunk != null && this.forbiddenEvents.has(chunk.event)) { + chunk = this.queue.shift()!; } - }; + + return chunk; + } } diff --git a/src/core/event-emitter/modules/stream/interface.ts b/src/core/event-emitter/modules/stream/interface.ts new file mode 100644 index 000000000..b68cf5a59 --- /dev/null +++ b/src/core/event-emitter/modules/stream/interface.ts @@ -0,0 +1,26 @@ +import type { EmitterEvent, HandlerValues } from 'core/event-emitter/interface'; + +/** + * + */ +export interface QueueChunk { + /** + * + */ + value: HandlerValues; + + /** + * + */ + event: EmitterEvent; +} + +/** + * + */ +export interface LocalOptions { + /** + * + */ + forbid?: boolean; +} diff --git a/src/core/event-emitter/spec.ts b/src/core/event-emitter/spec.ts index c36e9875b..4a7861b6c 100644 --- a/src/core/event-emitter/spec.ts +++ b/src/core/event-emitter/spec.ts @@ -2,102 +2,107 @@ import EventEmitter from 'core/event-emitter'; describe('core/event-emitter', () => { describe('subscribes to an event and recieves the emitted data', () => { - describe('subscribing by a callback', () => { - it('until the callback is unsubscribed explicitly', () => { + describe('until all events are emitted only once', () => { + it('via callback', () => { const emitter = new EventEmitter(), - e = 'event'; + events = ['foo', 'bar']; const listener = jest.fn(), recievedValues: unknown[] = []; - emitter.on(e, (...values) => { + emitter.once(events, (...values) => { recievedValues.push(...values); listener(); }); - emitter.emit(e, 1); - emitter.emit(e, 2, 3); + emitter.emit('foo', 1); + expect(listener).toBeCalledTimes(1); + expect(recievedValues).toEqual([1]); + emitter.emit('bar', 2, 3); expect(listener).toBeCalledTimes(2); expect(recievedValues).toEqual([1, 2, 3]); - emitter.off(e); - - emitter.emit(e, 4, 5); + emitter.emit('foo', 4); + emitter.emit('bar', 5, 6); expect(listener).toBeCalledTimes(2); expect(recievedValues).toEqual([1, 2, 3]); }); - it('until the event is emitted only once', () => { + it('via stream', async () => { const emitter = new EventEmitter(), - e = 'event'; + events = ['foo', 'bar']; const + stream = createStream(), listener = jest.fn(), recievedValues: unknown[] = []; - emitter.once(e, (...values) => { - recievedValues.push(...values); - listener(); - }); + emitter.emit('foo', 1); + emitter.emit('foo', 2); - emitter.emit(e, 1); - emitter.emit(e, 2, 3); + queueMicrotask(() => emitter.emit('bar', 3)); + queueMicrotask(() => emitter.off(events)); - expect(listener).toBeCalledTimes(1); - expect(recievedValues).toEqual([1]); + await stream; + + expect(listener).toBeCalledTimes(2); + expect(recievedValues).toEqual([1, 3]); + + async function createStream(): Promise { + for await (const values of emitter.once(events)) { + recievedValues.push(...values); + listener(); + } + } }); }); - describe('subscribing via stream', () => { - it('until all listeners to the event are unsubscribed', async () => { + describe('until any of the events is emitted only once', () => { + it('via callback', () => { const emitter = new EventEmitter(), - e = 'event'; + events = ['foo', 'bar']; const - stream = createStream(), listener = jest.fn(), recievedValues: unknown[] = []; - emitter.emit(e, 1); - emitter.emit(e, 2); - - queueMicrotask(() => emitter.emit(e, 3)); - queueMicrotask(() => emitter.off(e)); + emitter.any(events, (...values) => { + recievedValues.push(...values); + listener(); + }); - await stream; + const [first, second] = Math.random() > 0.5 ? events : [...events].reverse(); - expect(recievedValues).toEqual([1, 2, 3]); - expect(listener).toBeCalledTimes(3); + emitter.emit(first, 1); + emitter.emit(first, 2); + emitter.emit(second, 3); - async function createStream(): Promise { - for await (const values of emitter.on(e)) { - recievedValues.push(...values); - listener(); - } - } + expect(listener).toBeCalledTimes(1); + expect(recievedValues).toEqual([1]); }); - it('until the event is emitted only once', async () => { + it('via stream', async () => { const emitter = new EventEmitter(), - e = 'event'; + events = ['foo', 'bar']; const stream = createStream(), listener = jest.fn(), recievedValues: unknown[] = []; - emitter.emit(e, 1); - emitter.emit(e, 2); + const + [first, second] = Math.random() > 0.5 ? events : [...events].reverse(); - queueMicrotask(() => emitter.emit(e, 3)); - queueMicrotask(() => emitter.off(e)); + emitter.emit(first, 1); + emitter.emit(first, 2); + emitter.emit(second, 3); await stream; @@ -105,32 +110,69 @@ describe('core/event-emitter', () => { expect(listener).toBeCalledTimes(1); async function createStream(): Promise { - for await (const values of emitter.once(e)) { + for await (const values of emitter.any(events)) { recievedValues.push(...values); listener(); } } }); }); - }); - it.only('foo', async () => { - const - ee = new EventEmitter(), - stream = createStream(); + it('subscribing by a callback until it is unsubscribed explicitly', () => { + const + emitter = new EventEmitter(), + e = 'event'; + + const + listener = jest.fn(), + recievedValues: unknown[] = []; + + emitter.on(e, (...values) => { + recievedValues.push(...values); + listener(); + }); + + emitter.emit(e, 1); + emitter.emit(e, 2, 3); + + expect(listener).toBeCalledTimes(2); + expect(recievedValues).toEqual([1, 2, 3]); + + emitter.off(e); + + emitter.emit(e, 4, 5); + + expect(listener).toBeCalledTimes(2); + expect(recievedValues).toEqual([1, 2, 3]); + }); + + it('subscribing via stream until all listeners to the event are unsubscribed', async () => { + const + emitter = new EventEmitter(), + e = 'event'; + + const + stream = createStream(), + listener = jest.fn(), + recievedValues: unknown[] = []; - ee.emit('foo', 'foo'); + emitter.emit(e, 1); + emitter.emit(e, 2); - queueMicrotask(() => ee.emit('bar', 'bar')); + queueMicrotask(() => emitter.emit(e, 3)); + queueMicrotask(() => emitter.off(e)); - await stream; + await stream; - console.log('yoo'); + expect(recievedValues).toEqual([1, 2, 3]); + expect(listener).toBeCalledTimes(3); - async function createStream(): Promise { - for await (const data of ee.once(['foo', 'bar'])) { - console.log(data); + async function createStream(): Promise { + for await (const values of emitter.on(e)) { + recievedValues.push(...values); + listener(); + } } - } + }); }); }); From ecc4dc9dba2b23119813b910ad5d789085abd46c Mon Sep 17 00:00:00 2001 From: geoprv Date: Sun, 9 Apr 2023 07:26:10 +0300 Subject: [PATCH 10/10] refactor: simplify code --- src/core/event-emitter/engines/default.ts | 4 ++ src/core/event-emitter/index.ts | 47 ++++++++++--------- src/core/event-emitter/interface.ts | 5 ++ .../event-emitter/modules/stream/index.ts | 6 +-- src/core/event-emitter/spec.ts | 2 +- 5 files changed, 39 insertions(+), 25 deletions(-) diff --git a/src/core/event-emitter/engines/default.ts b/src/core/event-emitter/engines/default.ts index b17906a36..346592fca 100644 --- a/src/core/event-emitter/engines/default.ts +++ b/src/core/event-emitter/engines/default.ts @@ -28,6 +28,10 @@ class Engine extends EventEmitter2 implements EmitterEngine { }); } + prepend(event: EmitterEvent, handler: AnyFunction): void { + this.prependListener(event, handler); + } + /** * */ diff --git a/src/core/event-emitter/index.ts b/src/core/event-emitter/index.ts index e70db436d..296fb5cd0 100644 --- a/src/core/event-emitter/index.ts +++ b/src/core/event-emitter/index.ts @@ -41,7 +41,7 @@ export default class EventEmitter = {}) { - this.options = Object.mixin(true, defaultOptions, options); + this.options = Object.mixin(true, {}, defaultOptions, options); this.engine = this.options.engine(this.options.engineOptions); } @@ -60,12 +60,21 @@ export default class EventEmitter this.engine.on(event, handler)); } + /** + * + */ + prepend(events: CanIterable, handler: EventHandler): void { + this.normalizeEvents(events).forEach((event) => { + this.engine.prepend(event, handler); + }); + } + /** * */ @@ -81,8 +90,7 @@ export default class EventEmitter, handler?: EventHandler): CanVoid> { const - eventsArr = this.normalizeEvents(events), - stream = this.getStream(handler, eventsArr); + eventsArr = this.normalizeEvents(events); eventsArr.forEach((event) => { const wrapper: EventHandler = (...params) => { @@ -98,8 +106,8 @@ export default class EventEmitter, handler?: EventHandler): CanVoid> { const eventsArr = this.normalizeEvents(events), - handlers = new Map(), - stream = this.getStream(handler, eventsArr); + handlers = new Map(); eventsArr.forEach((event) => { const wrapper: EventHandler = (...params) => { - if (handler == null) { - void stream?.return(); - } else { - handler(...params); - } + handler?.(...params); + + handlers.forEach((localHandler, event) => { + this.off(event, localHandler); - handlers.forEach((handler, event) => this.off(event, handler)); + if (handler == null) { + this.localEmitter.emit(`off.${event}`); + } + }); }; this.on(event, wrapper); handlers.set(event, wrapper); }); - if (stream != null) { - return stream; + if (handler == null) { + return this.stream(eventsArr); } } @@ -192,11 +201,7 @@ export default class EventEmitter, eventsArr: EmitterEvent[]): Nullable { - if (handler != null) { - return; - } - + protected stream(eventsArr: EmitterEvent[]): Stream { return new Stream(this, this.localEmitter, eventsArr); } } diff --git a/src/core/event-emitter/interface.ts b/src/core/event-emitter/interface.ts index df7da464f..ce395ec41 100644 --- a/src/core/event-emitter/interface.ts +++ b/src/core/event-emitter/interface.ts @@ -29,6 +29,11 @@ export interface EmitterEngine { */ on(event: EmitterEvent, handler: AnyFunction): void; + /** + * + */ + prepend(event: EmitterEvent, handler: AnyFunction): void; + /** * */ diff --git a/src/core/event-emitter/modules/stream/index.ts b/src/core/event-emitter/modules/stream/index.ts index f9a70f36c..3ea6799e4 100644 --- a/src/core/event-emitter/modules/stream/index.ts +++ b/src/core/event-emitter/modules/stream/index.ts @@ -92,7 +92,7 @@ export default class Stream implements AsyncIterableIterator { const semaphore = createsAsyncSemaphore(terminateStream, ...this.events); - for (const event of this.events) { + this.events.forEach((event) => { const handler: EventHandler = (...value) => { if (this.pendingPromise == null) { this.queue.push({event, value}); @@ -103,7 +103,7 @@ export default class Stream implements AsyncIterableIterator { this.listeners.set(event, handler); - this.emitter.on(event, handler); + this.emitter.prepend(event, handler); this.localEmitter.once(`off.${event}`, (options?: LocalOptions) => { semaphore(event); @@ -112,7 +112,7 @@ export default class Stream implements AsyncIterableIterator { this.forbiddenEvents.add(event); } }); - } + }); } /** diff --git a/src/core/event-emitter/spec.ts b/src/core/event-emitter/spec.ts index 4a7861b6c..0cea80be8 100644 --- a/src/core/event-emitter/spec.ts +++ b/src/core/event-emitter/spec.ts @@ -46,7 +46,7 @@ describe('core/event-emitter', () => { emitter.emit('foo', 2); queueMicrotask(() => emitter.emit('bar', 3)); - queueMicrotask(() => emitter.off(events)); + queueMicrotask(() => emitter.emit('bar', 4)); await stream;