Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions packages/node-sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -315,17 +315,21 @@ the number of calls that are sent to Bucket's servers. During process shutdown,
some messages could be waiting to be sent, and thus, would be discarded if the
buffer is not flushed.

A naive example:
By default, the SDK automatically subscribes to process exit signals and attempts to flush
any pending events. This behavior is controlled by the `flushOnExit` option in the client configuration:

```typescript
process.on("SIGINT", () => {
console.log("Flushing batch buffer...");
client.flush().then(() => {
process.exit(0);
});
const client = new BucketClient({
batchOptions: {
flushOnExit: false, // disable automatic flushing on exit
},
});
```

> [!NOTE]
> If you are creating multiple client instances in your application, it's recommended to disable `flushOnExit`
> to avoid potential conflicts during process shutdown. In such cases, you should implement your own flush handling.

When you bind a client to a user/company, this data is matched against the
targeting rules. To get accurate targeting, you must ensure that the user/company
information provided is sufficient to match against the targeting rules you've
Expand Down
5 changes: 0 additions & 5 deletions packages/node-sdk/example/bucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@ let featureOverrides = (context: Context): FeatureOverrides => {
return { "delete-todos": true }; // feature keys checked at compile time
};

let host = undefined;
if (process.env.BUCKET_HOST) {
host = process.env.BUCKET_HOST;
}

// Create a new BucketClient instance with the secret key and default features
// The default features will be used if the user does not have any features set
// Create a bucketConfig.json file to configure the client or set environment variables
Expand Down
2 changes: 2 additions & 0 deletions packages/node-sdk/example/serve.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import "dotenv/config";

import bucket from "./bucket";
import app from "./app";

Expand Down
2 changes: 1 addition & 1 deletion packages/node-sdk/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@bucketco/node-sdk",
"version": "1.5.2",
"version": "1.5.3",
"license": "MIT",
"repository": {
"type": "git",
Expand Down
12 changes: 12 additions & 0 deletions packages/node-sdk/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
SDK_VERSION_HEADER_NAME,
} from "./config";
import fetchClient from "./fetch-http-client";
import { subscribe as triggerOnExit } from "./flusher";
import { newRateLimiter } from "./rate-limiter";
import type {
EvaluatedFeaturesAPIResponse,
Expand Down Expand Up @@ -112,6 +113,7 @@ export class BucketClient {
offline: boolean;
configFile?: string;
};

private _initialize = once(async () => {
if (!this._config.offline) {
await this.getFeaturesCache().refresh();
Expand Down Expand Up @@ -239,6 +241,10 @@ export class BucketClient {
: () => config.featureOverrides,
};

if ((config.batchOptions?.flushOnExit ?? true) && !this._config.offline) {
triggerOnExit(() => this.flush());
}

if (!new URL(this._config.apiBaseUrl).pathname.endsWith("/")) {
this._config.apiBaseUrl += "/";
}
Expand Down Expand Up @@ -443,8 +449,14 @@ export class BucketClient {
* @remarks
* It is recommended to call this method when the application is shutting down to ensure all events are sent
* before the process exits.
*
* This method is automatically called when the process exits if `batchOptions.flushOnExit` is `true` in the options (default).
*/
public async flush() {
if (this._config.offline) {
return;
}

await this._config.batchBuffer.flush();
}

Expand Down
1 change: 1 addition & 0 deletions packages/node-sdk/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export const API_BASE_URL = "https://front.bucket.co";
export const SDK_VERSION_HEADER_NAME = "bucket-sdk-version";
export const SDK_VERSION = `node-sdk/${version}`;
export const API_TIMEOUT_MS = 5000;
export const END_FLUSH_TIMEOUT_MS = 5000;

export const BUCKET_LOG_PREFIX = "[Bucket]";

Expand Down
62 changes: 62 additions & 0 deletions packages/node-sdk/src/flusher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { constants } from "os";

import { END_FLUSH_TIMEOUT_MS } from "./config";
import { TimeoutError, withTimeout } from "./utils";

type Callback = () => Promise<void>;

const killSignals = ["SIGINT", "SIGTERM", "SIGHUP", "SIGBREAK"] as const;

export function subscribe(
callback: Callback,
timeout: number = END_FLUSH_TIMEOUT_MS,
) {
let state: boolean | undefined;

const wrappedCallback = async () => {
if (state !== undefined) {
return;
}

state = false;

try {
await withTimeout(callback(), timeout);
} catch (error) {
if (error instanceof TimeoutError) {
console.error(
"[Bucket SDK] Timeout while flushing events on process exit.",
);
} else {
console.error(
"[Bucket SDK] An error occurred while flushing events on process exit.",
error,
);
}
}

state = true;
};

killSignals.forEach((signal) => {
const hasListeners = process.listenerCount(signal) > 0;

if (hasListeners) {
process.prependListener(signal, wrappedCallback);
} else {
process.on(signal, async () => {
await wrappedCallback();
process.exit(0x80 + constants.signals[signal]);
});
}
});

process.on("beforeExit", wrappedCallback);
process.on("exit", () => {
if (!state) {
console.error(
"[Bucket SDK] Failed to finalize the flushing of events on process exit.",
);
}
});
}
11 changes: 11 additions & 0 deletions packages/node-sdk/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -288,13 +288,24 @@ export type BatchBufferOptions<T> = {

/**
* The maximum size of the buffer before it is flushed.
*
* @defaultValue `100`
**/
maxSize?: number;

/**
* The interval in milliseconds at which the buffer is flushed.
*
* @defaultValue `1000`
**/
intervalMs?: number;

/**
* Whether to flush the buffer on exit.
*
* @defaultValue `true`
*/
flushOnExit?: boolean;
};

/**
Expand Down
41 changes: 41 additions & 0 deletions packages/node-sdk/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,44 @@ export function once<T extends () => ReturnType<T>>(
return returned;
};
}

export class TimeoutError extends Error {
constructor(timeoutMs: number) {
super(`Operation timed out after ${timeoutMs}ms`);
this.name = "TimeoutError";
}
}

/**
* Wraps a promise with a timeout. If the promise doesn't resolve within the specified
* timeout, it will reject with a timeout error. The original promise will still
* continue to execute but its result will be ignored.
*
* @param promise - The promise to wrap with a timeout
* @param timeoutMs - The timeout in milliseconds
* @returns A promise that resolves with the original promise result or rejects with a timeout error
* @throws {Error} If the timeout is reached before the promise resolves
**/
export function withTimeout<T>(
promise: Promise<T>,
timeoutMs: number,
): Promise<T> {
ok(timeoutMs > 0, "timeout must be a positive number");

return new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => {
reject(new TimeoutError(timeoutMs));
}, timeoutMs);

promise
.then((result) => {
resolve(result);
})
.catch((error) => {
reject(error);
})
.finally(() => {
clearTimeout(timeoutId);
});
});
}
48 changes: 48 additions & 0 deletions packages/node-sdk/test/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
SDK_VERSION_HEADER_NAME,
} from "../src/config";
import fetchClient from "../src/fetch-http-client";
import { subscribe as triggerOnExit } from "../src/flusher";
import { newRateLimiter } from "../src/rate-limiter";
import { ClientOptions, Context, FeaturesAPIResponse } from "../src/types";

Expand All @@ -45,6 +46,10 @@ vi.mock("../src/rate-limiter", async (importOriginal) => {
};
});

vi.mock("../src/flusher", () => ({
subscribe: vi.fn(),
}));

const user = {
id: "user123",
age: 1,
Expand Down Expand Up @@ -82,6 +87,7 @@ const validOptions: ClientOptions = {
batchOptions: {
maxSize: 99,
intervalMs: 100,
flushOnExit: false,
},
offline: false,
};
Expand Down Expand Up @@ -300,6 +306,36 @@ describe("BucketClient", () => {
);
});

it("should not register an exit flush handler if `batchOptions.flushOnExit` is false", () => {
new BucketClient({
...validOptions,
batchOptions: { ...validOptions.batchOptions, flushOnExit: false },
});

expect(triggerOnExit).not.toHaveBeenCalled();
});

it("should not register an exit flush handler if `offline` is true", () => {
new BucketClient({
...validOptions,
offline: true,
});

expect(triggerOnExit).not.toHaveBeenCalled();
});

it.each([undefined, true])(
"should register an exit flush handler if `batchOptions.flushOnExit` is `%s`",
(flushOnExit) => {
new BucketClient({
...validOptions,
batchOptions: { ...validOptions.batchOptions, flushOnExit },
});

expect(triggerOnExit).toHaveBeenCalledWith(expect.any(Function));
},
);

it.each([
["https://api.example.com", "https://api.example.com/bulk"],
["https://api.example.com/", "https://api.example.com/bulk"],
Expand Down Expand Up @@ -899,6 +935,18 @@ describe("BucketClient", () => {
],
);
});

it("should not flush all bulk data if `offline` is true", async () => {
const client = new BucketClient({
...validOptions,
offline: true,
});

await client.updateUser(user.id, { attributes: { age: 2 } });
await client.flush();

expect(httpClient.post).not.toHaveBeenCalled();
});
});

describe("getFeature", () => {
Expand Down
Loading
Loading