diff --git a/packages/node-sdk/README.md b/packages/node-sdk/README.md index 63441612..cff744dc 100644 --- a/packages/node-sdk/README.md +++ b/packages/node-sdk/README.md @@ -315,17 +315,21 @@ the number of calls that are sent to Bucket's servers. During process shutdown, some messages could be waiting to be sent, and thus, would be discarded if the buffer is not flushed. -A naive example: +By default, the SDK automatically subscribes to process exit signals and attempts to flush +any pending events. This behavior is controlled by the `flushOnExit` option in the client configuration: ```typescript -process.on("SIGINT", () => { - console.log("Flushing batch buffer..."); - client.flush().then(() => { - process.exit(0); - }); +const client = new BucketClient({ + batchOptions: { + flushOnExit: false, // disable automatic flushing on exit + }, }); ``` +> [!NOTE] +> If you are creating multiple client instances in your application, it's recommended to disable `flushOnExit` +> to avoid potential conflicts during process shutdown. In such cases, you should implement your own flush handling. + When you bind a client to a user/company, this data is matched against the targeting rules. To get accurate targeting, you must ensure that the user/company information provided is sufficient to match against the targeting rules you've diff --git a/packages/node-sdk/example/bucket.ts b/packages/node-sdk/example/bucket.ts index 6aebcfd8..37add14d 100644 --- a/packages/node-sdk/example/bucket.ts +++ b/packages/node-sdk/example/bucket.ts @@ -14,11 +14,6 @@ let featureOverrides = (context: Context): FeatureOverrides => { return { "delete-todos": true }; // feature keys checked at compile time }; -let host = undefined; -if (process.env.BUCKET_HOST) { - host = process.env.BUCKET_HOST; -} - // Create a new BucketClient instance with the secret key and default features // The default features will be used if the user does not have any features set // Create a bucketConfig.json file to configure the client or set environment variables diff --git a/packages/node-sdk/example/serve.ts b/packages/node-sdk/example/serve.ts index b4abd9d9..ada781f5 100644 --- a/packages/node-sdk/example/serve.ts +++ b/packages/node-sdk/example/serve.ts @@ -1,3 +1,5 @@ +import "dotenv/config"; + import bucket from "./bucket"; import app from "./app"; diff --git a/packages/node-sdk/package.json b/packages/node-sdk/package.json index 81bde285..3ba99cea 100644 --- a/packages/node-sdk/package.json +++ b/packages/node-sdk/package.json @@ -1,6 +1,6 @@ { "name": "@bucketco/node-sdk", - "version": "1.5.2", + "version": "1.5.3", "license": "MIT", "repository": { "type": "git", diff --git a/packages/node-sdk/src/client.ts b/packages/node-sdk/src/client.ts index 7fb53be9..967fc211 100644 --- a/packages/node-sdk/src/client.ts +++ b/packages/node-sdk/src/client.ts @@ -14,6 +14,7 @@ import { SDK_VERSION_HEADER_NAME, } from "./config"; import fetchClient from "./fetch-http-client"; +import { subscribe as triggerOnExit } from "./flusher"; import { newRateLimiter } from "./rate-limiter"; import type { EvaluatedFeaturesAPIResponse, @@ -112,6 +113,7 @@ export class BucketClient { offline: boolean; configFile?: string; }; + private _initialize = once(async () => { if (!this._config.offline) { await this.getFeaturesCache().refresh(); @@ -239,6 +241,10 @@ export class BucketClient { : () => config.featureOverrides, }; + if ((config.batchOptions?.flushOnExit ?? true) && !this._config.offline) { + triggerOnExit(() => this.flush()); + } + if (!new URL(this._config.apiBaseUrl).pathname.endsWith("/")) { this._config.apiBaseUrl += "/"; } @@ -443,8 +449,14 @@ export class BucketClient { * @remarks * It is recommended to call this method when the application is shutting down to ensure all events are sent * before the process exits. + * + * This method is automatically called when the process exits if `batchOptions.flushOnExit` is `true` in the options (default). */ public async flush() { + if (this._config.offline) { + return; + } + await this._config.batchBuffer.flush(); } diff --git a/packages/node-sdk/src/config.ts b/packages/node-sdk/src/config.ts index 503629c2..c470300e 100644 --- a/packages/node-sdk/src/config.ts +++ b/packages/node-sdk/src/config.ts @@ -9,6 +9,7 @@ export const API_BASE_URL = "https://front.bucket.co"; export const SDK_VERSION_HEADER_NAME = "bucket-sdk-version"; export const SDK_VERSION = `node-sdk/${version}`; export const API_TIMEOUT_MS = 5000; +export const END_FLUSH_TIMEOUT_MS = 5000; export const BUCKET_LOG_PREFIX = "[Bucket]"; diff --git a/packages/node-sdk/src/flusher.ts b/packages/node-sdk/src/flusher.ts new file mode 100644 index 00000000..47e4937d --- /dev/null +++ b/packages/node-sdk/src/flusher.ts @@ -0,0 +1,62 @@ +import { constants } from "os"; + +import { END_FLUSH_TIMEOUT_MS } from "./config"; +import { TimeoutError, withTimeout } from "./utils"; + +type Callback = () => Promise; + +const killSignals = ["SIGINT", "SIGTERM", "SIGHUP", "SIGBREAK"] as const; + +export function subscribe( + callback: Callback, + timeout: number = END_FLUSH_TIMEOUT_MS, +) { + let state: boolean | undefined; + + const wrappedCallback = async () => { + if (state !== undefined) { + return; + } + + state = false; + + try { + await withTimeout(callback(), timeout); + } catch (error) { + if (error instanceof TimeoutError) { + console.error( + "[Bucket SDK] Timeout while flushing events on process exit.", + ); + } else { + console.error( + "[Bucket SDK] An error occurred while flushing events on process exit.", + error, + ); + } + } + + state = true; + }; + + killSignals.forEach((signal) => { + const hasListeners = process.listenerCount(signal) > 0; + + if (hasListeners) { + process.prependListener(signal, wrappedCallback); + } else { + process.on(signal, async () => { + await wrappedCallback(); + process.exit(0x80 + constants.signals[signal]); + }); + } + }); + + process.on("beforeExit", wrappedCallback); + process.on("exit", () => { + if (!state) { + console.error( + "[Bucket SDK] Failed to finalize the flushing of events on process exit.", + ); + } + }); +} diff --git a/packages/node-sdk/src/types.ts b/packages/node-sdk/src/types.ts index b54c3e8b..291cb978 100644 --- a/packages/node-sdk/src/types.ts +++ b/packages/node-sdk/src/types.ts @@ -288,13 +288,24 @@ export type BatchBufferOptions = { /** * The maximum size of the buffer before it is flushed. + * + * @defaultValue `100` **/ maxSize?: number; /** * The interval in milliseconds at which the buffer is flushed. + * + * @defaultValue `1000` **/ intervalMs?: number; + + /** + * Whether to flush the buffer on exit. + * + * @defaultValue `true` + */ + flushOnExit?: boolean; }; /** diff --git a/packages/node-sdk/src/utils.ts b/packages/node-sdk/src/utils.ts index e2ec10c9..150714da 100644 --- a/packages/node-sdk/src/utils.ts +++ b/packages/node-sdk/src/utils.ts @@ -175,3 +175,44 @@ export function once ReturnType>( return returned; }; } + +export class TimeoutError extends Error { + constructor(timeoutMs: number) { + super(`Operation timed out after ${timeoutMs}ms`); + this.name = "TimeoutError"; + } +} + +/** + * Wraps a promise with a timeout. If the promise doesn't resolve within the specified + * timeout, it will reject with a timeout error. The original promise will still + * continue to execute but its result will be ignored. + * + * @param promise - The promise to wrap with a timeout + * @param timeoutMs - The timeout in milliseconds + * @returns A promise that resolves with the original promise result or rejects with a timeout error + * @throws {Error} If the timeout is reached before the promise resolves + **/ +export function withTimeout( + promise: Promise, + timeoutMs: number, +): Promise { + ok(timeoutMs > 0, "timeout must be a positive number"); + + return new Promise((resolve, reject) => { + const timeoutId = setTimeout(() => { + reject(new TimeoutError(timeoutMs)); + }, timeoutMs); + + promise + .then((result) => { + resolve(result); + }) + .catch((error) => { + reject(error); + }) + .finally(() => { + clearTimeout(timeoutId); + }); + }); +} diff --git a/packages/node-sdk/test/client.test.ts b/packages/node-sdk/test/client.test.ts index a34dcd32..aab0be77 100644 --- a/packages/node-sdk/test/client.test.ts +++ b/packages/node-sdk/test/client.test.ts @@ -23,6 +23,7 @@ import { SDK_VERSION_HEADER_NAME, } from "../src/config"; import fetchClient from "../src/fetch-http-client"; +import { subscribe as triggerOnExit } from "../src/flusher"; import { newRateLimiter } from "../src/rate-limiter"; import { ClientOptions, Context, FeaturesAPIResponse } from "../src/types"; @@ -45,6 +46,10 @@ vi.mock("../src/rate-limiter", async (importOriginal) => { }; }); +vi.mock("../src/flusher", () => ({ + subscribe: vi.fn(), +})); + const user = { id: "user123", age: 1, @@ -82,6 +87,7 @@ const validOptions: ClientOptions = { batchOptions: { maxSize: 99, intervalMs: 100, + flushOnExit: false, }, offline: false, }; @@ -300,6 +306,36 @@ describe("BucketClient", () => { ); }); + it("should not register an exit flush handler if `batchOptions.flushOnExit` is false", () => { + new BucketClient({ + ...validOptions, + batchOptions: { ...validOptions.batchOptions, flushOnExit: false }, + }); + + expect(triggerOnExit).not.toHaveBeenCalled(); + }); + + it("should not register an exit flush handler if `offline` is true", () => { + new BucketClient({ + ...validOptions, + offline: true, + }); + + expect(triggerOnExit).not.toHaveBeenCalled(); + }); + + it.each([undefined, true])( + "should register an exit flush handler if `batchOptions.flushOnExit` is `%s`", + (flushOnExit) => { + new BucketClient({ + ...validOptions, + batchOptions: { ...validOptions.batchOptions, flushOnExit }, + }); + + expect(triggerOnExit).toHaveBeenCalledWith(expect.any(Function)); + }, + ); + it.each([ ["https://api.example.com", "https://api.example.com/bulk"], ["https://api.example.com/", "https://api.example.com/bulk"], @@ -899,6 +935,18 @@ describe("BucketClient", () => { ], ); }); + + it("should not flush all bulk data if `offline` is true", async () => { + const client = new BucketClient({ + ...validOptions, + offline: true, + }); + + await client.updateUser(user.id, { attributes: { age: 2 } }); + await client.flush(); + + expect(httpClient.post).not.toHaveBeenCalled(); + }); }); describe("getFeature", () => { diff --git a/packages/node-sdk/test/flusher.test.ts b/packages/node-sdk/test/flusher.test.ts new file mode 100644 index 00000000..e6b09511 --- /dev/null +++ b/packages/node-sdk/test/flusher.test.ts @@ -0,0 +1,210 @@ +import { constants } from "os"; +import { + afterEach, + beforeEach, + describe, + expect, + it, + MockInstance, + vi, +} from "vitest"; + +import { subscribe } from "../src/flusher"; + +describe("flusher", () => { + const mockExit = vi + .spyOn(process, "exit") + .mockImplementation((() => undefined) as any); + + const mockConsoleError = vi + .spyOn(console, "error") + .mockImplementation(() => undefined); + + const mockProcessOn = vi + .spyOn(process, "on") + .mockImplementation((_, __) => process); + + const mockProcessPrependListener = ( + vi.spyOn(process, "prependListener") as unknown as MockInstance< + [event: NodeJS.Signals, listener: NodeJS.SignalsListener], + NodeJS.Process + > + ).mockImplementation((_, __) => process); + + const mockListenerCount = vi + .spyOn(process, "listenerCount") + .mockReturnValue(0); + + function timedCallback(ms: number) { + return vi.fn().mockImplementation( + () => + new Promise((resolve) => { + setTimeout(resolve, ms); + }), + ); + } + + function getHandler(eventName: string, prepended = false) { + return prepended + ? mockProcessPrependListener.mock.calls.filter( + ([evt]) => evt === eventName, + )[0][1] + : mockProcessOn.mock.calls.filter(([evt]) => evt === eventName)[0][1]; + } + + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + vi.resetAllMocks(); + }); + + describe("signal handling", () => { + const signals = ["SIGINT", "SIGTERM", "SIGHUP", "SIGBREAK"] as const; + + describe.each(signals)("signal %s", (signal) => { + it("should handle signal with no existing listeners", async () => { + mockListenerCount.mockReturnValue(0); + const callback = vi.fn().mockResolvedValue(undefined); + + subscribe(callback); + expect(mockProcessOn).toHaveBeenCalledWith( + signal, + expect.any(Function), + ); + + getHandler(signal)(signal); + await vi.runAllTimersAsync(); + + expect(callback).toHaveBeenCalledTimes(1); + expect(mockExit).toHaveBeenCalledWith(0x80 + constants.signals[signal]); + }); + + it("should prepend handler when listeners exist", async () => { + mockListenerCount.mockReturnValue(1); + const callback = vi.fn().mockResolvedValue(undefined); + + subscribe(callback); + + expect(mockProcessPrependListener).toHaveBeenCalledWith( + signal, + expect.any(Function), + ); + + getHandler(signal, true)(signal); + + expect(callback).toHaveBeenCalledTimes(1); + expect(mockExit).not.toHaveBeenCalled(); + }); + }); + }); + + describe("beforeExit handling", () => { + it("should call callback on beforeExit", async () => { + const callback = vi.fn().mockResolvedValue(undefined); + + subscribe(callback); + + getHandler("beforeExit")(); + + expect(callback).toHaveBeenCalledTimes(1); + }); + + it("should not call callback multiple times", async () => { + const callback = vi.fn().mockResolvedValue(undefined); + + subscribe(callback); + + getHandler("beforeExit")(); + getHandler("beforeExit")(); + + expect(callback).toHaveBeenCalledTimes(1); + }); + }); + + describe("timeout handling", () => { + it("should handle timeout when callback takes too long", async () => { + subscribe(timedCallback(2000), 1000); + + getHandler("beforeExit")(); + + await vi.advanceTimersByTimeAsync(1000); + + expect(mockConsoleError).toHaveBeenCalledWith( + "[Bucket SDK] Timeout while flushing events on process exit.", + ); + }); + + it("should not timeout when callback completes in time", async () => { + subscribe(timedCallback(500), 1000); + + getHandler("beforeExit")(); + await vi.advanceTimersByTimeAsync(500); + + expect(mockConsoleError).not.toHaveBeenCalled(); + }); + }); + + describe("exit state handling", () => { + it("should log error if exit occurs before flushing starts", () => { + subscribe(timedCallback(0)); + + getHandler("exit")(); + + expect(mockConsoleError).toHaveBeenCalledWith( + "[Bucket SDK] Failed to finalize the flushing of events on process exit.", + ); + }); + + it("should log error if exit occurs before flushing completes", async () => { + subscribe(timedCallback(2000)); + getHandler("beforeExit")(); + + await vi.advanceTimersByTimeAsync(1000); + + getHandler("exit")(); + + expect(mockConsoleError).toHaveBeenCalledWith( + "[Bucket SDK] Failed to finalize the flushing of events on process exit.", + ); + }); + + it("should not log error if flushing completes before exit", async () => { + subscribe(timedCallback(500)); + + getHandler("beforeExit")(); + await vi.advanceTimersByTimeAsync(500); + + getHandler("exit")(); + + expect(mockConsoleError).not.toHaveBeenCalled(); + }); + + it("should handle callback errors gracefully", async () => { + subscribe(vi.fn().mockRejectedValue(new Error("Test error"))); + + getHandler("beforeExit")(); + await vi.runAllTimersAsync(); + + expect(mockConsoleError).toHaveBeenCalledWith( + "[Bucket SDK] An error occurred while flushing events on process exit.", + expect.any(Error), + ); + }); + }); + + it("should run the callback only once", async () => { + const callback = vi.fn().mockResolvedValue(undefined); + + subscribe(callback); + + getHandler("SIGINT")("SIGINT"); + getHandler("beforeExit")(); + + await vi.runAllTimersAsync(); + + expect(callback).toHaveBeenCalledTimes(1); + }); +}); diff --git a/packages/node-sdk/test/utils.test.ts b/packages/node-sdk/test/utils.test.ts index 0ef0e5be..43fc90ae 100644 --- a/packages/node-sdk/test/utils.test.ts +++ b/packages/node-sdk/test/utils.test.ts @@ -1,5 +1,5 @@ import { createHash } from "crypto"; -import { describe, expect, it, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { decorateLogger, @@ -8,6 +8,8 @@ import { mergeSkipUndefined, ok, once, + TimeoutError, + withTimeout, } from "../src/utils"; describe("isObject", () => { @@ -205,3 +207,94 @@ describe("once()", () => { expect(fn).toHaveBeenCalledTimes(1); }); }); + +describe("withTimeout()", () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("should resolve when promise completes before timeout", async () => { + const promise = Promise.resolve("success"); + const result = withTimeout(promise, 1000); + + await expect(result).resolves.toBe("success"); + }); + + it("should reject with TimeoutError when promise takes too long", async () => { + const slowPromise = new Promise((resolve) => { + setTimeout(() => resolve("too late"), 2000); + }); + + const result = withTimeout(slowPromise, 1000); + + vi.advanceTimersByTime(1000); + + await expect(result).rejects.toThrow("Operation timed out after 1000ms"); + await expect(result).rejects.toBeInstanceOf(TimeoutError); + }); + + it("should propagate original promise rejection", async () => { + const error = new Error("original error"); + const failedPromise = Promise.reject(error); + + const result = withTimeout(failedPromise, 1000); + + await expect(result).rejects.toBe(error); + }); + + it("should reject immediately for negative timeout", async () => { + const promise = Promise.resolve("success"); + + await expect(async () => { + await withTimeout(promise, -1); + }).rejects.toThrow("validation failed: timeout must be a positive number"); + }); + + it("should reject immediately for zero timeout", async () => { + const promise = Promise.resolve("success"); + + await expect(async () => { + await withTimeout(promise, 0); + }).rejects.toThrow("validation failed: timeout must be a positive number"); + }); + + it("should clean up timeout when promise resolves", async () => { + const clearTimeoutSpy = vi.spyOn(global, "clearTimeout"); + const promise = Promise.resolve("success"); + + await withTimeout(promise, 1000); + await vi.runAllTimersAsync(); + + expect(clearTimeoutSpy).toHaveBeenCalled(); + clearTimeoutSpy.mockRestore(); + }); + + it("should clean up timeout when promise rejects", async () => { + const clearTimeoutSpy = vi.spyOn(global, "clearTimeout"); + const promise = Promise.reject(new Error("fail")); + + await expect(withTimeout(promise, 1000)).rejects.toThrow("fail"); + + expect(clearTimeoutSpy).toHaveBeenCalled(); + clearTimeoutSpy.mockRestore(); + }); + + it("should not resolve after timeout occurs", async () => { + const slowPromise = new Promise((resolve) => { + setTimeout(() => resolve("too late"), 2000); + }); + + const result = withTimeout(slowPromise, 1000); + + vi.advanceTimersByTime(1000); // Trigger timeout + await expect(result).rejects.toThrow("Operation timed out after 1000ms"); + + vi.advanceTimersByTime(1000); // Complete the original promise + // The promise should still be rejected with the timeout error + await expect(result).rejects.toThrow("Operation timed out after 1000ms"); + }); +});