Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,17 @@ CLERK_SECRET_KEY=sk_live_
# Change this to 'true' if you want to import to a development instance
IMPORT_TO_DEV_INSTANCE=false

# Delay between createUser requests
# Delay between createUser requests (deprecated - use CONCURRENCY_LIMIT instead)
# Clerk's normal rate limit is 1 request/second
DELAY=1050

# Number of concurrent requests to make (default: 10)
# Adjust based on your Clerk plan's rate limits
CONCURRENCY_LIMIT=10

# Delay when rate limited (default: 10000ms)
RETRY_DELAY_MS=10000

# Offset to start migration from (default: 0)
OFFSET=0

164 changes: 140 additions & 24 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const DELAY = parseInt(process.env.DELAY_MS ?? `1_000`);
const RETRY_DELAY = parseInt(process.env.RETRY_DELAY_MS ?? `10_000`);
const IMPORT_TO_DEV = process.env.IMPORT_TO_DEV_INSTANCE ?? "false";
const OFFSET = parseInt(process.env.OFFSET ?? `0`);
const CONCURRENCY_LIMIT = parseInt(process.env.CONCURRENCY_LIMIT ?? `10`);

if (!SECRET_KEY) {
throw new Error(
Expand Down Expand Up @@ -74,7 +75,7 @@ const createUser = (userData: User) =>
firstName: userData.firstName,
lastName: userData.lastName,
passwordDigest: userData.password,
passwordHasher: userData.passwordHasher,
passwordHasher: userData.passwordHasher as any, // Clerk SDK type issue
privateMetadata: userData.private_metadata,
publicMetadata: userData.public_metadata,
unsafeMetadata: userData.unsafe_metadata,
Expand All @@ -91,25 +92,56 @@ const createUser = (userData: User) =>
});

const now = new Date().toISOString().split(".")[0]; // YYYY-MM-DDTHH:mm:ss
const logFilePath = `./migration-log-${now}.json`;

// Use a write stream for better performance
let logStream: fs.WriteStream | null = null;

function initLogStream() {
if (!logStream) {
logStream = fs.createWriteStream(logFilePath, { flags: 'a' });
}
}

function appendLog(payload: any) {
fs.appendFileSync(
`./migration-log-${now}.json`,
`\n${JSON.stringify(payload, null, 2)}`
);
initLogStream();
logStream!.write(`\n${JSON.stringify(payload, null, 2)}`);
}

// Ensure log stream is closed on exit
process.on('exit', () => {
if (logStream) {
logStream.end();
}
});

process.on('SIGINT', () => {
if (logStream) {
logStream.end();
}
process.exit(0);
});

let migrated = 0;
let alreadyExists = 0;

async function processUserToClerk(userData: User, spinner: Ora) {
// Set to track users already being processed or completed
const processedUsers = new Set<string>();

async function processUserToClerk(userData: User, spinner: Ora, retryCount = 0) {
const txt = spinner.text;

// Check if this user is already being processed or has been completed
if (processedUsers.has(userData.userId)) {
return;
}

// Mark user as being processed
processedUsers.add(userData.userId);

try {
const parsedUserData = userSchema.safeParse(userData);
if (!parsedUserData.success) {
throw parsedUserData.error;
}
await createUser(parsedUserData.data);

// User data is already validated, so we can directly create the user
await createUser(userData);
migrated++;
} catch (error) {
if (error.status === 422) {
Expand All @@ -120,24 +152,84 @@ async function processUserToClerk(userData: User, spinner: Ora) {

// Keep cooldown in case rate limit is reached as a fallback if the thread blocking fails
if (error.status === 429) {
spinner.text = `${txt} - rate limit reached, waiting for ${RETRY_DELAY} ms`;
await rateLimitCooldown();
// Remove from processed set since we're going to retry
processedUsers.delete(userData.userId);

// Add exponential backoff and jitter to prevent thundering herd
const backoffMs = RETRY_DELAY * Math.pow(2, retryCount) + Math.random() * 1000;
spinner.text = `${txt} - rate limit reached, waiting for ${Math.round(backoffMs)} ms (retry ${retryCount + 1})`;
await new Promise((r) => setTimeout(r, backoffMs));
spinner.text = txt;
return processUserToClerk(userData, spinner);

// Limit retries to prevent infinite recursion
if (retryCount < 3) {
return processUserToClerk(userData, spinner, retryCount + 1);
} else {
appendLog({ userId: userData.userId, error: 'Max retries exceeded for rate limit', ...error });
return;
}
}

appendLog({ userId: userData.userId, ...error });
}
}

async function cooldown() {
await new Promise((r) => setTimeout(r, DELAY));
// No delay needed - removed for performance
return Promise.resolve();
}

async function rateLimitCooldown() {
console.log(`Rate limit reached, waiting for ${RETRY_DELAY} ms...`);
await new Promise((r) => setTimeout(r, RETRY_DELAY));
}

// Process users concurrently with a limit
async function processConcurrently<T>(
items: T[],
processor: (item: T, index: number) => Promise<void>,
concurrencyLimit: number,
spinner: Ora
): Promise<void> {
let completed = 0;
let running = 0;
let index = 0;

return new Promise((resolve, reject) => {
const processNext = async () => {
if (index >= items.length) {
if (running === 0) resolve();
return;
}

const currentIndex = index++;
const item = items[currentIndex];
running++;

try {
spinner.text = `Processing user ${completed + 1}/${items.length} (${running} concurrent)`;
await processor(item, currentIndex);
completed++;
running--;

if (completed === items.length) {
resolve();
} else {
processNext();
}
} catch (error) {
running--;
reject(error);
}
};

// Start initial batch of concurrent operations
for (let i = 0; i < Math.min(concurrencyLimit, items.length); i++) {
processNext();
}
});
}

async function main() {
console.log(`Clerk User Migration Utility`);

Expand All @@ -152,18 +244,42 @@ async function main() {
console.log(
`users.json found and parsed, attempting migration with an offset of ${OFFSET}`
);
console.log(`Processing ${offsetUsers.length} users with concurrency limit of ${CONCURRENCY_LIMIT}`);

let i = 0;
const spinner = ora(`Migrating users`).start();
// Pre-validate all user data to catch errors early
const validatedUsers: User[] = [];
const validationErrors: any[] = [];

for (let i = 0; i < offsetUsers.length; i++) {
const parsed = userSchema.safeParse(offsetUsers[i]);
if (parsed.success) {
validatedUsers.push(parsed.data);
} else {
validationErrors.push({
index: i,
userId: offsetUsers[i]?.userId,
error: "error" in parsed ? parsed.error.errors : "Unknown validation error"
});
}
}

for (const userData of offsetUsers) {
spinner.text = `Migrating user ${i}/${offsetUsers.length}, cooldown`;
await cooldown();
i++;
spinner.text = `Migrating user ${i}/${offsetUsers.length}`;
await processUserToClerk(userData, spinner);
if (validationErrors.length > 0) {
console.log(`${validationErrors.length} users failed validation and will be skipped`);
validationErrors.forEach(err => appendLog(err));
}

const spinner = ora(`Migrating users`).start();

// Process users concurrently
await processConcurrently(
validatedUsers,
async (userData: User, index: number) => {
await processUserToClerk(userData, spinner);
},
CONCURRENCY_LIMIT,
spinner
);

spinner.succeed(`Migration complete`);
return;
}
Expand Down
17 changes: 17 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,8 @@
"dotenv": "^16.3.1",
"ora": "^7.0.1",
"zod": "^3.22.4"
},
"devDependencies": {
"typescript": "^5.9.2"
}
}