From 983a2b99dd1ec51a74d8985e3f1585b9105b972f Mon Sep 17 00:00:00 2001 From: Simon Tower Date: Thu, 14 Aug 2025 17:08:46 -0700 Subject: [PATCH 1/2] Enhanced script to use concurrent execution Added concurrency vars to .env.example --- .env.example | 12 ++++- index.ts | 132 +++++++++++++++++++++++++++++++++++++++------- package-lock.json | 17 ++++++ package.json | 3 ++ 4 files changed, 143 insertions(+), 21 deletions(-) diff --git a/.env.example b/.env.example index 88134cb..dfb013f 100644 --- a/.env.example +++ b/.env.example @@ -4,7 +4,17 @@ CLERK_SECRET_KEY=sk_live_ # Change this to 'true' if you want to import to a development instance IMPORT_TO_DEV_INSTANCE=false -# Delay between createUser requests +# Delay between createUser requests (deprecated - use CONCURRENCY_LIMIT instead) # Clerk's normal rate limit is 1 request/second DELAY=1050 +# Number of concurrent requests to make (default: 10) +# Adjust based on your Clerk plan's rate limits +CONCURRENCY_LIMIT=10 + +# Delay when rate limited (default: 10000ms) +RETRY_DELAY_MS=10000 + +# Offset to start migration from (default: 0) +OFFSET=0 + diff --git a/index.ts b/index.ts index 2e8e77f..08aa806 100755 --- a/index.ts +++ b/index.ts @@ -11,6 +11,7 @@ const DELAY = parseInt(process.env.DELAY_MS ?? `1_000`); const RETRY_DELAY = parseInt(process.env.RETRY_DELAY_MS ?? `10_000`); const IMPORT_TO_DEV = process.env.IMPORT_TO_DEV_INSTANCE ?? "false"; const OFFSET = parseInt(process.env.OFFSET ?? `0`); +const CONCURRENCY_LIMIT = parseInt(process.env.CONCURRENCY_LIMIT ?? `10`); if (!SECRET_KEY) { throw new Error( @@ -74,7 +75,7 @@ const createUser = (userData: User) => firstName: userData.firstName, lastName: userData.lastName, passwordDigest: userData.password, - passwordHasher: userData.passwordHasher, + passwordHasher: userData.passwordHasher as any, // Clerk SDK type issue privateMetadata: userData.private_metadata, publicMetadata: userData.public_metadata, unsafeMetadata: userData.unsafe_metadata, @@ -91,25 +92,44 @@ const createUser = (userData: User) => }); const now = new Date().toISOString().split(".")[0]; // YYYY-MM-DDTHH:mm:ss +const logFilePath = `./migration-log-${now}.json`; + +// Use a write stream for better performance +let logStream: fs.WriteStream | null = null; + +function initLogStream() { + if (!logStream) { + logStream = fs.createWriteStream(logFilePath, { flags: 'a' }); + } +} + function appendLog(payload: any) { - fs.appendFileSync( - `./migration-log-${now}.json`, - `\n${JSON.stringify(payload, null, 2)}` - ); + initLogStream(); + logStream!.write(`\n${JSON.stringify(payload, null, 2)}`); } +// Ensure log stream is closed on exit +process.on('exit', () => { + if (logStream) { + logStream.end(); + } +}); + +process.on('SIGINT', () => { + if (logStream) { + logStream.end(); + } + process.exit(0); +}); + let migrated = 0; let alreadyExists = 0; async function processUserToClerk(userData: User, spinner: Ora) { const txt = spinner.text; try { - const parsedUserData = userSchema.safeParse(userData); - if (!parsedUserData.success) { - throw parsedUserData.error; - } - await createUser(parsedUserData.data); - + // User data is already validated, so we can directly create the user + await createUser(userData); migrated++; } catch (error) { if (error.status === 422) { @@ -131,13 +151,61 @@ async function processUserToClerk(userData: User, spinner: Ora) { } async function cooldown() { - await new Promise((r) => setTimeout(r, DELAY)); + // No delay needed - removed for performance + return Promise.resolve(); } async function rateLimitCooldown() { + console.log(`Rate limit reached, waiting for ${RETRY_DELAY} ms...`); await new Promise((r) => setTimeout(r, RETRY_DELAY)); } +// Process users concurrently with a limit +async function processConcurrently( + items: T[], + processor: (item: T, index: number) => Promise, + concurrencyLimit: number, + spinner: Ora +): Promise { + let completed = 0; + let running = 0; + let index = 0; + + return new Promise((resolve, reject) => { + const processNext = async () => { + if (index >= items.length) { + if (running === 0) resolve(); + return; + } + + const currentIndex = index++; + const item = items[currentIndex]; + running++; + + try { + spinner.text = `Processing user ${completed + 1}/${items.length} (${running} concurrent)`; + await processor(item, currentIndex); + completed++; + running--; + + if (completed === items.length) { + resolve(); + } else { + processNext(); + } + } catch (error) { + running--; + reject(error); + } + }; + + // Start initial batch of concurrent operations + for (let i = 0; i < Math.min(concurrencyLimit, items.length); i++) { + processNext(); + } + }); +} + async function main() { console.log(`Clerk User Migration Utility`); @@ -152,18 +220,42 @@ async function main() { console.log( `users.json found and parsed, attempting migration with an offset of ${OFFSET}` ); + console.log(`Processing ${offsetUsers.length} users with concurrency limit of ${CONCURRENCY_LIMIT}`); - let i = 0; - const spinner = ora(`Migrating users`).start(); + // Pre-validate all user data to catch errors early + const validatedUsers: User[] = []; + const validationErrors: any[] = []; + + for (let i = 0; i < offsetUsers.length; i++) { + const parsed = userSchema.safeParse(offsetUsers[i]); + if (parsed.success) { + validatedUsers.push(parsed.data); + } else { + validationErrors.push({ + index: i, + userId: offsetUsers[i]?.userId, + error: "error" in parsed ? parsed.error.errors : "Unknown validation error" + }); + } + } - for (const userData of offsetUsers) { - spinner.text = `Migrating user ${i}/${offsetUsers.length}, cooldown`; - await cooldown(); - i++; - spinner.text = `Migrating user ${i}/${offsetUsers.length}`; - await processUserToClerk(userData, spinner); + if (validationErrors.length > 0) { + console.log(`${validationErrors.length} users failed validation and will be skipped`); + validationErrors.forEach(err => appendLog(err)); } + const spinner = ora(`Migrating users`).start(); + + // Process users concurrently + await processConcurrently( + validatedUsers, + async (userData: User, index: number) => { + await processUserToClerk(userData, spinner); + }, + CONCURRENCY_LIMIT, + spinner + ); + spinner.succeed(`Migration complete`); return; } diff --git a/package-lock.json b/package-lock.json index e50a911..67c04b4 100644 --- a/package-lock.json +++ b/package-lock.json @@ -17,6 +17,9 @@ }, "bin": { "clerk-user-migration": "index.ts" + }, + "devDependencies": { + "typescript": "^5.9.2" } }, "node_modules/@clerk/backend": { @@ -968,6 +971,20 @@ "version": "2.6.2", "license": "0BSD" }, + "node_modules/typescript": { + "version": "5.9.2", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.9.2.tgz", + "integrity": "sha512-CWBzXQrc/qOkhidw1OzBTQuYRbfyxDXJMVJ1XNwUHGROVmuaeiEm3OslpZ1RV96d7SKKjZKrSJu3+t/xlw3R9A==", + "dev": true, + "license": "Apache-2.0", + "bin": { + "tsc": "bin/tsc", + "tsserver": "bin/tsserver" + }, + "engines": { + "node": ">=14.17" + } + }, "node_modules/use-sync-external-store": { "version": "1.2.0", "resolved": "https://registry.npmjs.org/use-sync-external-store/-/use-sync-external-store-1.2.0.tgz", diff --git a/package.json b/package.json index 060996b..3b47ed9 100644 --- a/package.json +++ b/package.json @@ -15,5 +15,8 @@ "dotenv": "^16.3.1", "ora": "^7.0.1", "zod": "^3.22.4" + }, + "devDependencies": { + "typescript": "^5.9.2" } } From 10e8d80bad47082064f6d074c2d0be7db0e6cbad Mon Sep 17 00:00:00 2001 From: Simon Tower Date: Thu, 14 Aug 2025 18:04:59 -0700 Subject: [PATCH 2/2] fix race condition --- index.ts | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/index.ts b/index.ts index 08aa806..fb26120 100755 --- a/index.ts +++ b/index.ts @@ -125,8 +125,20 @@ process.on('SIGINT', () => { let migrated = 0; let alreadyExists = 0; -async function processUserToClerk(userData: User, spinner: Ora) { +// Set to track users already being processed or completed +const processedUsers = new Set(); + +async function processUserToClerk(userData: User, spinner: Ora, retryCount = 0) { const txt = spinner.text; + + // Check if this user is already being processed or has been completed + if (processedUsers.has(userData.userId)) { + return; + } + + // Mark user as being processed + processedUsers.add(userData.userId); + try { // User data is already validated, so we can directly create the user await createUser(userData); @@ -140,10 +152,22 @@ async function processUserToClerk(userData: User, spinner: Ora) { // Keep cooldown in case rate limit is reached as a fallback if the thread blocking fails if (error.status === 429) { - spinner.text = `${txt} - rate limit reached, waiting for ${RETRY_DELAY} ms`; - await rateLimitCooldown(); + // Remove from processed set since we're going to retry + processedUsers.delete(userData.userId); + + // Add exponential backoff and jitter to prevent thundering herd + const backoffMs = RETRY_DELAY * Math.pow(2, retryCount) + Math.random() * 1000; + spinner.text = `${txt} - rate limit reached, waiting for ${Math.round(backoffMs)} ms (retry ${retryCount + 1})`; + await new Promise((r) => setTimeout(r, backoffMs)); spinner.text = txt; - return processUserToClerk(userData, spinner); + + // Limit retries to prevent infinite recursion + if (retryCount < 3) { + return processUserToClerk(userData, spinner, retryCount + 1); + } else { + appendLog({ userId: userData.userId, error: 'Max retries exceeded for rate limit', ...error }); + return; + } } appendLog({ userId: userData.userId, ...error });