From 70e76e80302741d7baac626e4433f6c138c68bd0 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Sat, 7 Mar 2026 15:45:15 -0800 Subject: [PATCH 01/10] Add byok migration script for newly hosted keys --- .../scripts/migrate-block-api-keys-to-byok.ts | 562 ++++++++++++++++++ 1 file changed, 562 insertions(+) create mode 100644 packages/db/scripts/migrate-block-api-keys-to-byok.ts diff --git a/packages/db/scripts/migrate-block-api-keys-to-byok.ts b/packages/db/scripts/migrate-block-api-keys-to-byok.ts new file mode 100644 index 00000000000..16150611103 --- /dev/null +++ b/packages/db/scripts/migrate-block-api-keys-to-byok.ts @@ -0,0 +1,562 @@ +#!/usr/bin/env bun + +// Self-contained script for migrating block-level API keys into workspace BYOK keys. +// Original block-level values are left untouched for safety. +// Handles both literal keys ("sk-xxx...") and env var references ("{{VAR_NAME}}"). +// +// Usage: +// # Dry run: audit for conflicts + preview inserts (no DB writes) +// bun run packages/db/scripts/migrate-block-api-keys-to-byok.ts --dry-run \ +// --map jina=jina --map perplexity=perplexity --map google_books=google_cloud +// +// # Live run: insert BYOK keys +// bun run packages/db/scripts/migrate-block-api-keys-to-byok.ts \ +// --map jina=jina --map perplexity=perplexity --map google_books=google_cloud + +import { createCipheriv, createDecipheriv, randomBytes } from 'crypto' +import { eq, sql } from 'drizzle-orm' +import { index, json, jsonb, pgTable, text, timestamp, uniqueIndex } from 'drizzle-orm/pg-core' +import { drizzle } from 'drizzle-orm/postgres-js' +import postgres from 'postgres' +import { v4 as uuidv4 } from 'uuid' + +// ---------- CLI parsing ---------- +const DRY_RUN = process.argv.includes('--dry-run') +const BATCH_SIZE = 50 + +function parseMapArgs(): Record { + const mapping: Record = {} + const args = process.argv.slice(2) + for (let i = 0; i < args.length; i++) { + if (args[i] === '--map' && args[i + 1]) { + const [blockType, providerId] = args[i + 1].split('=') + if (blockType && providerId) { + mapping[blockType] = providerId + } else { + console.error(`Invalid --map value: "${args[i + 1]}". Expected format: blockType=providerId`) + process.exit(1) + } + i++ + } + } + return mapping +} + +const BLOCK_TYPE_TO_PROVIDER = parseMapArgs() +if (Object.keys(BLOCK_TYPE_TO_PROVIDER).length === 0) { + console.error('No --map arguments provided. Specify at least one: --map blockType=providerId') + console.error( + 'Example: --map jina=jina --map perplexity=perplexity --map google_books=google_cloud' + ) + process.exit(1) +} + +// ---------- Env ---------- +function getEnv(name: string): string | undefined { + if (typeof process !== 'undefined' && process.env && name in process.env) { + return process.env[name] + } + return undefined +} + +const CONNECTION_STRING = getEnv('POSTGRES_URL') ?? getEnv('DATABASE_URL') +if (!CONNECTION_STRING) { + console.error('Missing POSTGRES_URL or DATABASE_URL environment variable') + process.exit(1) +} + +const ENCRYPTION_KEY = getEnv('ENCRYPTION_KEY') +if (!ENCRYPTION_KEY || ENCRYPTION_KEY.length !== 64) { + console.error('ENCRYPTION_KEY must be set to a 64-character hex string (32 bytes)') + process.exit(1) +} + +// ---------- Inlined encryption helpers (mirrors apps/sim/lib/core/security/encryption.ts) ---------- +function getEncryptionKeyBuffer(): Buffer { + return Buffer.from(ENCRYPTION_KEY!, 'hex') +} + +async function encryptSecret(secret: string): Promise { + const iv = randomBytes(16) + const key = getEncryptionKeyBuffer() + const cipher = createCipheriv('aes-256-gcm', key, iv) + let encrypted = cipher.update(secret, 'utf8', 'hex') + encrypted += cipher.final('hex') + const authTag = cipher.getAuthTag() + return `${iv.toString('hex')}:${encrypted}:${authTag.toString('hex')}` +} + +async function decryptSecret(encryptedValue: string): Promise { + const parts = encryptedValue.split(':') + const ivHex = parts[0] + const authTagHex = parts[parts.length - 1] + const encrypted = parts.slice(1, -1).join(':') + + if (!ivHex || !encrypted || !authTagHex) { + throw new Error('Invalid encrypted value format. Expected "iv:encrypted:authTag"') + } + + const key = getEncryptionKeyBuffer() + const iv = Buffer.from(ivHex, 'hex') + const authTag = Buffer.from(authTagHex, 'hex') + + const decipher = createDecipheriv('aes-256-gcm', key, iv) + decipher.setAuthTag(authTag) + + let decrypted = decipher.update(encrypted, 'hex', 'utf8') + decrypted += decipher.final('utf8') + return decrypted +} + +// ---------- Minimal schema ---------- +const workflow = pgTable('workflow', { + id: text('id').primaryKey(), + userId: text('user_id').notNull(), + workspaceId: text('workspace_id'), + name: text('name').notNull(), +}) + +const workflowBlocks = pgTable( + 'workflow_blocks', + { + id: text('id').primaryKey(), + workflowId: text('workflow_id').notNull(), + type: text('type').notNull(), + name: text('name').notNull(), + subBlocks: jsonb('sub_blocks').notNull().default('{}'), + createdAt: timestamp('created_at').notNull().defaultNow(), + updatedAt: timestamp('updated_at').notNull().defaultNow(), + }, + (table) => ({ + workflowIdIdx: index('workflow_blocks_workflow_id_idx').on(table.workflowId), + }) +) + +const workspaceBYOKKeys = pgTable( + 'workspace_byok_keys', + { + id: text('id').primaryKey(), + workspaceId: text('workspace_id').notNull(), + providerId: text('provider_id').notNull(), + encryptedApiKey: text('encrypted_api_key').notNull(), + createdBy: text('created_by'), + createdAt: timestamp('created_at').notNull().defaultNow(), + updatedAt: timestamp('updated_at').notNull().defaultNow(), + }, + (table) => ({ + workspaceProviderUnique: uniqueIndex('workspace_byok_provider_unique').on( + table.workspaceId, + table.providerId + ), + workspaceIdx: index('workspace_byok_workspace_idx').on(table.workspaceId), + }) +) + +const environment = pgTable('environment', { + id: text('id').primaryKey(), + userId: text('user_id').notNull(), + variables: json('variables').notNull(), +}) + +const workspaceEnvironment = pgTable('workspace_environment', { + id: text('id').primaryKey(), + workspaceId: text('workspace_id').notNull(), + variables: json('variables').notNull().default('{}'), +}) + +// ---------- DB client ---------- +const postgresClient = postgres(CONNECTION_STRING, { + prepare: false, + idle_timeout: 20, + connect_timeout: 30, + max: 10, + onnotice: () => {}, +}) +const db = drizzle(postgresClient) + +// ---------- Agent/HITL nested tool handling ---------- +const TOOL_INPUT_SUBBLOCK_IDS: Record = { + agent: 'tools', + human_in_the_loop: 'notification', +} + +const ENV_VAR_PATTERN = /^\{\{([^}]+)\}\}$/ + +function isEnvVarReference(value: string): boolean { + return ENV_VAR_PATTERN.test(value) +} + +function extractEnvVarName(value: string): string | null { + const match = ENV_VAR_PATTERN.exec(value) + return match ? match[1].trim() : null +} + +function maskKey(key: string): string { + if (key.length <= 8) return '•'.repeat(8) + return key.slice(0, 4) + '•'.repeat(Math.min(key.length - 8, 12)) + key.slice(-4) +} + +function parseToolInputValue(value: unknown): any[] { + if (Array.isArray(value)) return value + if (typeof value === 'string') { + try { + const parsed = JSON.parse(value) + if (Array.isArray(parsed)) return parsed + } catch {} + } + return [] +} + +// ---------- Types ---------- +type KeyEntry = { + workspaceId: string + providerId: string + apiKey: string + userId: string + blockId: string + blockName: string + workflowId: string + workflowName: string + rawValue: string + isEnvVar: boolean +} + +// ---------- Main ---------- +async function run() { + console.log(`Mode: ${DRY_RUN ? 'DRY RUN (audit + preview)' : 'LIVE'}`) + console.log(`Mappings: ${Object.entries(BLOCK_TYPE_TO_PROVIDER).map(([b, p]) => `${b}=${p}`).join(', ')}`) + console.log('---\n') + + try { + // 1. Build block type list: mapped types + agent/HITL for nested tools + const mappedBlockTypes = Object.keys(BLOCK_TYPE_TO_PROVIDER) + const agentTypes = Object.keys(TOOL_INPUT_SUBBLOCK_IDS) + const allBlockTypes = [...new Set([...mappedBlockTypes, ...agentTypes])] + + const rows = await db + .select({ + blockId: workflowBlocks.id, + blockName: workflowBlocks.name, + blockType: workflowBlocks.type, + subBlocks: workflowBlocks.subBlocks, + workflowId: workflow.id, + workflowName: workflow.name, + userId: workflow.userId, + workspaceId: workflow.workspaceId, + }) + .from(workflowBlocks) + .innerJoin(workflow, eq(workflowBlocks.workflowId, workflow.id)) + .where( + sql`${workflowBlocks.type} IN (${sql.join( + allBlockTypes.map((t) => sql`${t}`), + sql`, ` + )})` + ) + + console.log(`Found ${rows.length} candidate blocks\n`) + + // 2. Pre-load env vars for resolving {{VAR}} references + const personalEnvRows = await db.select().from(environment) + const workspaceEnvRows = await db.select().from(workspaceEnvironment) + + const personalEnvByUser = new Map>() + for (const row of personalEnvRows) { + personalEnvByUser.set(row.userId, (row.variables as Record) || {}) + } + + const workspaceEnvByWs = new Map>() + for (const row of workspaceEnvRows) { + workspaceEnvByWs.set(row.workspaceId, (row.variables as Record) || {}) + } + + console.log( + `Loaded env vars: ${personalEnvByUser.size} users, ${workspaceEnvByWs.size} workspaces\n` + ) + + async function resolveApiKeyValue( + value: string, + workspaceId: string, + userId: string, + context: string + ): Promise<{ resolvedKey: string | null; isEnvVar: boolean; failed: boolean }> { + if (isEnvVarReference(value)) { + const varName = extractEnvVarName(value) + if (varName) { + const wsVars = workspaceEnvByWs.get(workspaceId) + const personalVars = personalEnvByUser.get(userId) + const encryptedValue = wsVars?.[varName] ?? personalVars?.[varName] + + if (encryptedValue) { + try { + const resolved = await decryptSecret(encryptedValue) + return { resolvedKey: resolved, isEnvVar: true, failed: false } + } catch (error) { + console.warn( + ` [WARN] Failed to decrypt env var "${varName}" for ${context}: ${error}` + ) + return { resolvedKey: null, isEnvVar: true, failed: true } + } + } else { + console.warn(` [WARN] Env var "${varName}" not found for ${context}`) + return { resolvedKey: null, isEnvVar: true, failed: true } + } + } + return { resolvedKey: null, isEnvVar: true, failed: true } + } + + return { resolvedKey: value, isEnvVar: false, failed: false } + } + + // 3. Scan all blocks and collect resolved keys + const allEntries: KeyEntry[] = [] + let literalCount = 0 + let envVarCount = 0 + let nestedToolKeyCount = 0 + let envVarResolutionFailures = 0 + let skippedNoWorkspace = 0 + let skippedEmptyKey = 0 + + for (const row of rows as any[]) { + const subBlocks = row.subBlocks as Record + const workspaceId = row.workspaceId as string | null + if (!workspaceId) { + skippedNoWorkspace++ + continue + } + + // --- Direct apiKey on the block --- + const providerId = BLOCK_TYPE_TO_PROVIDER[row.blockType] + if (providerId) { + const apiKeyValue = subBlocks?.apiKey?.value + if (typeof apiKeyValue === 'string' && apiKeyValue.trim()) { + const { resolvedKey, isEnvVar, failed } = await resolveApiKeyValue( + apiKeyValue, + workspaceId, + row.userId, + `block ${row.blockId}` + ) + + if (isEnvVar) envVarCount++ + else literalCount++ + if (failed) envVarResolutionFailures++ + + if (resolvedKey?.trim()) { + allEntries.push({ + workspaceId, + providerId, + apiKey: resolvedKey, + userId: row.userId, + blockId: row.blockId, + blockName: row.blockName, + workflowId: row.workflowId, + workflowName: row.workflowName, + rawValue: apiKeyValue, + isEnvVar, + }) + } + } else { + skippedEmptyKey++ + } + } + + // --- Nested tools inside agent / human_in_the_loop --- + const toolInputId = TOOL_INPUT_SUBBLOCK_IDS[row.blockType] + if (toolInputId) { + const toolInputSubBlock = subBlocks?.[toolInputId] + if (toolInputSubBlock) { + const tools = parseToolInputValue(toolInputSubBlock.value) + for (const tool of tools) { + const toolType = tool?.type as string | undefined + const toolApiKey = tool?.params?.apiKey as string | undefined + if (!toolType || !toolApiKey || !toolApiKey.trim()) continue + + const toolProviderId = BLOCK_TYPE_TO_PROVIDER[toolType] + if (!toolProviderId) continue + + nestedToolKeyCount++ + + const { resolvedKey, isEnvVar, failed } = await resolveApiKeyValue( + toolApiKey, + workspaceId, + row.userId, + `nested tool "${toolType}" in block ${row.blockId}` + ) + + if (isEnvVar) envVarCount++ + else literalCount++ + if (failed) envVarResolutionFailures++ + + if (resolvedKey?.trim()) { + allEntries.push({ + workspaceId, + providerId: toolProviderId, + apiKey: resolvedKey, + userId: row.userId, + blockId: row.blockId, + blockName: `${row.blockName} > tool "${tool.title || toolType}"`, + workflowId: row.workflowId, + workflowName: row.workflowName, + rawValue: toolApiKey, + isEnvVar, + }) + } + } + } + } + } + + console.log(`Literal API keys: ${literalCount}`) + console.log(`Env var references: ${envVarCount}`) + console.log(`Nested tool keys (agent/HITL): ${nestedToolKeyCount}`) + console.log(`Env var resolution failures: ${envVarResolutionFailures}`) + console.log(`Skipped (no workspace): ${skippedNoWorkspace}`) + console.log(`Skipped (empty key): ${skippedEmptyKey}`) + console.log(`Total resolved key entries: ${allEntries.length}\n`) + + // 4. Deduplicate by (workspaceId, providerId) — first key wins + const byokInserts = new Map< + string, + { workspaceId: string; providerId: string; apiKey: string; userId: string } + >() + for (const entry of allEntries) { + const dedupeKey = `${entry.workspaceId}::${entry.providerId}` + if (!byokInserts.has(dedupeKey)) { + byokInserts.set(dedupeKey, { + workspaceId: entry.workspaceId, + providerId: entry.providerId, + apiKey: entry.apiKey, + userId: entry.userId, + }) + } + } + + console.log(`Unique (workspace, provider) pairs to insert: ${byokInserts.size}\n`) + + // 5. Dry run: audit for conflicts + preview + if (DRY_RUN) { + // Group entries by (workspace, provider) to detect conflicts + const groupMap = new Map() + for (const entry of allEntries) { + const key = `${entry.workspaceId}::${entry.providerId}` + if (!groupMap.has(key)) groupMap.set(key, []) + groupMap.get(key)!.push(entry) + } + + let conflictCount = 0 + for (const [groupKey, entries] of groupMap) { + const distinctKeys = new Set(entries.map((e) => e.apiKey)) + if (distinctKeys.size <= 1) continue + + conflictCount++ + const [wsId, provId] = groupKey.split('::') + console.log( + ` [CONFLICT] workspace ${wsId}, provider "${provId}" has ${distinctKeys.size} distinct keys:` + ) + for (const entry of entries) { + const keyDisplay = entry.isEnvVar ? entry.rawValue : maskKey(entry.rawValue) + const typeLabel = entry.isEnvVar ? 'env var' : 'literal' + const resolvedNote = entry.isEnvVar ? ` -> resolves to ${maskKey(entry.apiKey)}` : '' + console.log( + ` Block "${entry.blockName}" in workflow "${entry.workflowName}": ${keyDisplay} (${typeLabel})${resolvedNote}` + ) + } + console.log() + } + + if (conflictCount === 0) { + console.log('No conflicts detected.\n') + } else { + console.log( + `${conflictCount} conflict(s) found. First key per (workspace, provider) will be used.\n` + ) + } + + for (const entry of byokInserts.values()) { + console.log( + ` [DRY RUN] Would insert BYOK key for workspace ${entry.workspaceId}, provider "${entry.providerId}": ${maskKey(entry.apiKey)}` + ) + } + + console.log('\n[DRY RUN] No changes were made to the database.') + console.log('Run without --dry-run to apply changes.') + return + } + + // 6. Live: insert into workspace_byok_keys + const insertEntries = Array.from(byokInserts.values()) + let insertedCount = 0 + let skippedConflictCount = 0 + let insertErrorCount = 0 + + for (let i = 0; i < insertEntries.length; i += BATCH_SIZE) { + const batch = insertEntries.slice(i, i + BATCH_SIZE) + const batchNum = Math.floor(i / BATCH_SIZE) + 1 + console.log( + `Insert batch ${batchNum} (${i + 1}-${Math.min(i + BATCH_SIZE, insertEntries.length)} of ${insertEntries.length})` + ) + + for (const entry of batch) { + try { + const encrypted = await encryptSecret(entry.apiKey) + + const result = await db + .insert(workspaceBYOKKeys) + .values({ + id: uuidv4(), + workspaceId: entry.workspaceId, + providerId: entry.providerId, + encryptedApiKey: encrypted, + createdBy: entry.userId, + }) + .onConflictDoNothing({ + target: [workspaceBYOKKeys.workspaceId, workspaceBYOKKeys.providerId], + }) + + if ((result as any).rowCount === 0) { + console.log( + ` [SKIP] BYOK key already exists for workspace ${entry.workspaceId}, provider "${entry.providerId}"` + ) + skippedConflictCount++ + } else { + console.log( + ` [INSERT] BYOK key for workspace ${entry.workspaceId}, provider "${entry.providerId}": ${maskKey(entry.apiKey)}` + ) + insertedCount++ + } + } catch (error) { + console.error( + ` [ERROR] Failed to insert BYOK key for workspace ${entry.workspaceId}, provider "${entry.providerId}":`, + error + ) + insertErrorCount++ + } + } + } + + // 7. Summary + console.log('\n---') + console.log('Migration Summary:') + console.log(` BYOK keys inserted: ${insertedCount}`) + console.log(` BYOK keys skipped (already existed): ${skippedConflictCount}`) + console.log(` BYOK insert errors: ${insertErrorCount}`) + console.log(` Env var resolution failures: ${envVarResolutionFailures}`) + console.log('\nMigration completed successfully!') + } catch (error) { + console.error('Fatal error during migration:', error) + process.exit(1) + } finally { + try { + await postgresClient.end({ timeout: 5 }) + } catch {} + } +} + +run() + .then(() => { + console.log('\nDone!') + process.exit(0) + }) + .catch((error) => { + console.error('Unexpected error:', error) + process.exit(1) + }) From 01b21ed0040a5292ff4feb3f3d76566cc3ef831b Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Sat, 7 Mar 2026 16:12:37 -0800 Subject: [PATCH 02/10] Change script to iterate on workflow --- .../scripts/migrate-block-api-keys-to-byok.ts | 445 ++++++++---------- 1 file changed, 190 insertions(+), 255 deletions(-) diff --git a/packages/db/scripts/migrate-block-api-keys-to-byok.ts b/packages/db/scripts/migrate-block-api-keys-to-byok.ts index 16150611103..c34485d6899 100644 --- a/packages/db/scripts/migrate-block-api-keys-to-byok.ts +++ b/packages/db/scripts/migrate-block-api-keys-to-byok.ts @@ -1,7 +1,7 @@ #!/usr/bin/env bun // Self-contained script for migrating block-level API keys into workspace BYOK keys. -// Original block-level values are left untouched for safety. +// Iterates per workspace. Original block-level values are left untouched for safety. // Handles both literal keys ("sk-xxx...") and env var references ("{{VAR_NAME}}"). // // Usage: @@ -20,9 +20,8 @@ import { drizzle } from 'drizzle-orm/postgres-js' import postgres from 'postgres' import { v4 as uuidv4 } from 'uuid' -// ---------- CLI parsing ---------- +// ---------- CLI ---------- const DRY_RUN = process.argv.includes('--dry-run') -const BATCH_SIZE = 50 function parseMapArgs(): Record { const mapping: Record = {} @@ -71,7 +70,7 @@ if (!ENCRYPTION_KEY || ENCRYPTION_KEY.length !== 64) { process.exit(1) } -// ---------- Inlined encryption helpers (mirrors apps/sim/lib/core/security/encryption.ts) ---------- +// ---------- Encryption (mirrors apps/sim/lib/core/security/encryption.ts) ---------- function getEncryptionKeyBuffer(): Buffer { return Buffer.from(ENCRYPTION_KEY!, 'hex') } @@ -108,7 +107,7 @@ async function decryptSecret(encryptedValue: string): Promise { return decrypted } -// ---------- Minimal schema ---------- +// ---------- Schema ---------- const workflow = pgTable('workflow', { id: text('id').primaryKey(), userId: text('user_id').notNull(), @@ -164,7 +163,7 @@ const workspaceEnvironment = pgTable('workspace_environment', { variables: json('variables').notNull().default('{}'), }) -// ---------- DB client ---------- +// ---------- DB ---------- const postgresClient = postgres(CONNECTION_STRING, { prepare: false, idle_timeout: 20, @@ -174,7 +173,7 @@ const postgresClient = postgres(CONNECTION_STRING, { }) const db = drizzle(postgresClient) -// ---------- Agent/HITL nested tool handling ---------- +// ---------- Helpers ---------- const TOOL_INPUT_SUBBLOCK_IDS: Record = { agent: 'tools', human_in_the_loop: 'notification', @@ -207,28 +206,33 @@ function parseToolInputValue(value: unknown): any[] { return [] } -// ---------- Types ---------- -type KeyEntry = { - workspaceId: string - providerId: string - apiKey: string - userId: string - blockId: string +type RawKeyRef = { + rawValue: string blockName: string - workflowId: string workflowName: string - rawValue: string - isEnvVar: boolean + userId: string } // ---------- Main ---------- async function run() { console.log(`Mode: ${DRY_RUN ? 'DRY RUN (audit + preview)' : 'LIVE'}`) - console.log(`Mappings: ${Object.entries(BLOCK_TYPE_TO_PROVIDER).map(([b, p]) => `${b}=${p}`).join(', ')}`) + console.log( + `Mappings: ${Object.entries(BLOCK_TYPE_TO_PROVIDER).map(([b, p]) => `${b}=${p}`).join(', ')}` + ) console.log('---\n') + const stats = { + workspacesProcessed: 0, + workspacesSkipped: 0, + conflicts: 0, + inserted: 0, + skippedExisting: 0, + errors: 0, + envVarFailures: 0, + } + try { - // 1. Build block type list: mapped types + agent/HITL for nested tools + // 1. Find all blocks that match our mapped types or contain nested tools const mappedBlockTypes = Object.keys(BLOCK_TYPE_TO_PROVIDER) const agentTypes = Object.keys(TOOL_INPUT_SUBBLOCK_IDS) const allBlockTypes = [...new Set([...mappedBlockTypes, ...agentTypes])] @@ -253,296 +257,227 @@ async function run() { )})` ) - console.log(`Found ${rows.length} candidate blocks\n`) - - // 2. Pre-load env vars for resolving {{VAR}} references - const personalEnvRows = await db.select().from(environment) - const workspaceEnvRows = await db.select().from(workspaceEnvironment) - - const personalEnvByUser = new Map>() - for (const row of personalEnvRows) { - personalEnvByUser.set(row.userId, (row.variables as Record) || {}) - } - - const workspaceEnvByWs = new Map>() - for (const row of workspaceEnvRows) { - workspaceEnvByWs.set(row.workspaceId, (row.variables as Record) || {}) - } - - console.log( - `Loaded env vars: ${personalEnvByUser.size} users, ${workspaceEnvByWs.size} workspaces\n` - ) - - async function resolveApiKeyValue( - value: string, - workspaceId: string, - userId: string, - context: string - ): Promise<{ resolvedKey: string | null; isEnvVar: boolean; failed: boolean }> { - if (isEnvVarReference(value)) { - const varName = extractEnvVarName(value) - if (varName) { - const wsVars = workspaceEnvByWs.get(workspaceId) - const personalVars = personalEnvByUser.get(userId) - const encryptedValue = wsVars?.[varName] ?? personalVars?.[varName] - - if (encryptedValue) { - try { - const resolved = await decryptSecret(encryptedValue) - return { resolvedKey: resolved, isEnvVar: true, failed: false } - } catch (error) { - console.warn( - ` [WARN] Failed to decrypt env var "${varName}" for ${context}: ${error}` - ) - return { resolvedKey: null, isEnvVar: true, failed: true } - } - } else { - console.warn(` [WARN] Env var "${varName}" not found for ${context}`) - return { resolvedKey: null, isEnvVar: true, failed: true } - } - } - return { resolvedKey: null, isEnvVar: true, failed: true } - } - - return { resolvedKey: value, isEnvVar: false, failed: false } - } - - // 3. Scan all blocks and collect resolved keys - const allEntries: KeyEntry[] = [] - let literalCount = 0 - let envVarCount = 0 - let nestedToolKeyCount = 0 - let envVarResolutionFailures = 0 + // Group rows by workspace + const workspaceRows = new Map() let skippedNoWorkspace = 0 - let skippedEmptyKey = 0 - - for (const row of rows as any[]) { - const subBlocks = row.subBlocks as Record - const workspaceId = row.workspaceId as string | null - if (!workspaceId) { + for (const row of rows) { + if (!row.workspaceId) { skippedNoWorkspace++ continue } + if (!workspaceRows.has(row.workspaceId)) workspaceRows.set(row.workspaceId, []) + workspaceRows.get(row.workspaceId)!.push(row) + } - // --- Direct apiKey on the block --- - const providerId = BLOCK_TYPE_TO_PROVIDER[row.blockType] - if (providerId) { - const apiKeyValue = subBlocks?.apiKey?.value - if (typeof apiKeyValue === 'string' && apiKeyValue.trim()) { - const { resolvedKey, isEnvVar, failed } = await resolveApiKeyValue( - apiKeyValue, - workspaceId, - row.userId, - `block ${row.blockId}` - ) + console.log(`Found ${rows.length} candidate blocks across ${workspaceRows.size} workspaces`) + if (skippedNoWorkspace > 0) console.log(`Skipped ${skippedNoWorkspace} blocks with no workspace`) + console.log() - if (isEnvVar) envVarCount++ - else literalCount++ - if (failed) envVarResolutionFailures++ + // 2. Iterate per workspace + for (const [workspaceId, blocks] of workspaceRows) { + console.log(`[Workspace ${workspaceId}] ${blocks.length} blocks`) - if (resolvedKey?.trim()) { - allEntries.push({ - workspaceId, - providerId, - apiKey: resolvedKey, - userId: row.userId, - blockId: row.blockId, - blockName: row.blockName, - workflowId: row.workflowId, - workflowName: row.workflowName, - rawValue: apiKeyValue, - isEnvVar, + // 2a. Extract all raw key references grouped by provider + const providerKeys = new Map() + + function addRef(providerId: string, ref: RawKeyRef) { + if (!providerKeys.has(providerId)) providerKeys.set(providerId, []) + providerKeys.get(providerId)!.push(ref) + } + + for (const block of blocks) { + const subBlocks = block.subBlocks as Record + + const providerId = BLOCK_TYPE_TO_PROVIDER[block.blockType] + if (providerId) { + const val = subBlocks?.apiKey?.value + if (typeof val === 'string' && val.trim()) { + addRef(providerId, { + rawValue: val, + blockName: block.blockName, + workflowName: block.workflowName, + userId: block.userId, }) } - } else { - skippedEmptyKey++ } - } - // --- Nested tools inside agent / human_in_the_loop --- - const toolInputId = TOOL_INPUT_SUBBLOCK_IDS[row.blockType] - if (toolInputId) { - const toolInputSubBlock = subBlocks?.[toolInputId] - if (toolInputSubBlock) { - const tools = parseToolInputValue(toolInputSubBlock.value) + const toolInputId = TOOL_INPUT_SUBBLOCK_IDS[block.blockType] + if (toolInputId) { + const tools = parseToolInputValue(subBlocks?.[toolInputId]?.value) for (const tool of tools) { const toolType = tool?.type as string | undefined const toolApiKey = tool?.params?.apiKey as string | undefined if (!toolType || !toolApiKey || !toolApiKey.trim()) continue - const toolProviderId = BLOCK_TYPE_TO_PROVIDER[toolType] if (!toolProviderId) continue + addRef(toolProviderId, { + rawValue: toolApiKey, + blockName: `${block.blockName} > tool "${tool.title || toolType}"`, + workflowName: block.workflowName, + userId: block.userId, + }) + } + } + } - nestedToolKeyCount++ + if (providerKeys.size === 0) { + console.log(' No API keys found, skipping\n') + stats.workspacesSkipped++ + continue + } - const { resolvedKey, isEnvVar, failed } = await resolveApiKeyValue( - toolApiKey, - workspaceId, - row.userId, - `nested tool "${toolType}" in block ${row.blockId}` - ) + // 2b. Load env vars only if this workspace has env var references + const needsEnvVars = [...providerKeys.values()] + .flat() + .some((ref) => isEnvVarReference(ref.rawValue)) + + let wsEnvVars: Record = {} + const personalEnvCache = new Map>() + + if (needsEnvVars) { + const wsEnvRows = await db + .select() + .from(workspaceEnvironment) + .where(sql`${workspaceEnvironment.workspaceId} = ${workspaceId}`) + .limit(1) + if (wsEnvRows[0]) { + wsEnvVars = (wsEnvRows[0].variables as Record) || {} + } - if (isEnvVar) envVarCount++ - else literalCount++ - if (failed) envVarResolutionFailures++ - - if (resolvedKey?.trim()) { - allEntries.push({ - workspaceId, - providerId: toolProviderId, - apiKey: resolvedKey, - userId: row.userId, - blockId: row.blockId, - blockName: `${row.blockName} > tool "${tool.title || toolType}"`, - workflowId: row.workflowId, - workflowName: row.workflowName, - rawValue: toolApiKey, - isEnvVar, - }) - } + const userIds = [...new Set([...providerKeys.values()].flat().map((r) => r.userId))] + if (userIds.length > 0) { + const personalRows = await db + .select() + .from(environment) + .where( + sql`${environment.userId} IN (${sql.join( + userIds.map((id) => sql`${id}`), + sql`, ` + )})` + ) + for (const row of personalRows) { + personalEnvCache.set(row.userId, (row.variables as Record) || {}) } } } - } - console.log(`Literal API keys: ${literalCount}`) - console.log(`Env var references: ${envVarCount}`) - console.log(`Nested tool keys (agent/HITL): ${nestedToolKeyCount}`) - console.log(`Env var resolution failures: ${envVarResolutionFailures}`) - console.log(`Skipped (no workspace): ${skippedNoWorkspace}`) - console.log(`Skipped (empty key): ${skippedEmptyKey}`) - console.log(`Total resolved key entries: ${allEntries.length}\n`) - - // 4. Deduplicate by (workspaceId, providerId) — first key wins - const byokInserts = new Map< - string, - { workspaceId: string; providerId: string; apiKey: string; userId: string } - >() - for (const entry of allEntries) { - const dedupeKey = `${entry.workspaceId}::${entry.providerId}` - if (!byokInserts.has(dedupeKey)) { - byokInserts.set(dedupeKey, { - workspaceId: entry.workspaceId, - providerId: entry.providerId, - apiKey: entry.apiKey, - userId: entry.userId, - }) - } - } + async function resolveKey( + ref: RawKeyRef, + context: string + ): Promise { + if (!isEnvVarReference(ref.rawValue)) return ref.rawValue - console.log(`Unique (workspace, provider) pairs to insert: ${byokInserts.size}\n`) + const varName = extractEnvVarName(ref.rawValue) + if (!varName) { + stats.envVarFailures++ + return null + } - // 5. Dry run: audit for conflicts + preview - if (DRY_RUN) { - // Group entries by (workspace, provider) to detect conflicts - const groupMap = new Map() - for (const entry of allEntries) { - const key = `${entry.workspaceId}::${entry.providerId}` - if (!groupMap.has(key)) groupMap.set(key, []) - groupMap.get(key)!.push(entry) - } + const personalVars = personalEnvCache.get(ref.userId) + const encryptedValue = wsEnvVars[varName] ?? personalVars?.[varName] + if (!encryptedValue) { + console.warn(` [WARN] Env var "${varName}" not found (${context})`) + stats.envVarFailures++ + return null + } - let conflictCount = 0 - for (const [groupKey, entries] of groupMap) { - const distinctKeys = new Set(entries.map((e) => e.apiKey)) - if (distinctKeys.size <= 1) continue - - conflictCount++ - const [wsId, provId] = groupKey.split('::') - console.log( - ` [CONFLICT] workspace ${wsId}, provider "${provId}" has ${distinctKeys.size} distinct keys:` - ) - for (const entry of entries) { - const keyDisplay = entry.isEnvVar ? entry.rawValue : maskKey(entry.rawValue) - const typeLabel = entry.isEnvVar ? 'env var' : 'literal' - const resolvedNote = entry.isEnvVar ? ` -> resolves to ${maskKey(entry.apiKey)}` : '' - console.log( - ` Block "${entry.blockName}" in workflow "${entry.workflowName}": ${keyDisplay} (${typeLabel})${resolvedNote}` - ) + try { + return await decryptSecret(encryptedValue) + } catch (error) { + console.warn(` [WARN] Failed to decrypt env var "${varName}" (${context}): ${error}`) + stats.envVarFailures++ + return null } - console.log() } - if (conflictCount === 0) { - console.log('No conflicts detected.\n') - } else { - console.log( - `${conflictCount} conflict(s) found. First key per (workspace, provider) will be used.\n` - ) - } + // 2c. For each provider, detect conflicts then resolve and insert + stats.workspacesProcessed++ - for (const entry of byokInserts.values()) { - console.log( - ` [DRY RUN] Would insert BYOK key for workspace ${entry.workspaceId}, provider "${entry.providerId}": ${maskKey(entry.apiKey)}` - ) - } + for (const [providerId, refs] of providerKeys) { + // Resolve all keys for this provider to check for conflicts + const resolved: { ref: RawKeyRef; key: string }[] = [] + for (const ref of refs) { + const key = await resolveKey(ref, `"${ref.blockName}" in "${ref.workflowName}"`) + if (key?.trim()) resolved.push({ ref, key }) + } - console.log('\n[DRY RUN] No changes were made to the database.') - console.log('Run without --dry-run to apply changes.') - return - } + if (resolved.length === 0) continue + + // Detect conflicting values + const distinctKeys = new Set(resolved.map((r) => r.key)) + if (distinctKeys.size > 1) { + stats.conflicts++ + console.log(` [CONFLICT] provider "${providerId}": ${distinctKeys.size} distinct keys`) + for (const { ref, key } of resolved) { + const display = isEnvVarReference(ref.rawValue) + ? `${ref.rawValue} -> ${maskKey(key)}` + : maskKey(ref.rawValue) + console.log(` "${ref.blockName}" in "${ref.workflowName}": ${display}`) + } + console.log(' Using first resolved key') + } - // 6. Live: insert into workspace_byok_keys - const insertEntries = Array.from(byokInserts.values()) - let insertedCount = 0 - let skippedConflictCount = 0 - let insertErrorCount = 0 - - for (let i = 0; i < insertEntries.length; i += BATCH_SIZE) { - const batch = insertEntries.slice(i, i + BATCH_SIZE) - const batchNum = Math.floor(i / BATCH_SIZE) + 1 - console.log( - `Insert batch ${batchNum} (${i + 1}-${Math.min(i + BATCH_SIZE, insertEntries.length)} of ${insertEntries.length})` - ) + // Use the first resolved key + const chosen = resolved[0] - for (const entry of batch) { - try { - const encrypted = await encryptSecret(entry.apiKey) + if (DRY_RUN) { + console.log( + ` [DRY RUN] Would insert BYOK for provider "${providerId}": ${maskKey(chosen.key)}` + ) + continue + } + // Insert into BYOK + try { + const encrypted = await encryptSecret(chosen.key) const result = await db .insert(workspaceBYOKKeys) .values({ id: uuidv4(), - workspaceId: entry.workspaceId, - providerId: entry.providerId, + workspaceId, + providerId, encryptedApiKey: encrypted, - createdBy: entry.userId, + createdBy: chosen.ref.userId, }) .onConflictDoNothing({ target: [workspaceBYOKKeys.workspaceId, workspaceBYOKKeys.providerId], }) if ((result as any).rowCount === 0) { - console.log( - ` [SKIP] BYOK key already exists for workspace ${entry.workspaceId}, provider "${entry.providerId}"` - ) - skippedConflictCount++ + console.log(` [SKIP] BYOK already exists for provider "${providerId}"`) + stats.skippedExisting++ } else { - console.log( - ` [INSERT] BYOK key for workspace ${entry.workspaceId}, provider "${entry.providerId}": ${maskKey(entry.apiKey)}` - ) - insertedCount++ + console.log(` [INSERT] BYOK for provider "${providerId}": ${maskKey(chosen.key)}`) + stats.inserted++ } } catch (error) { - console.error( - ` [ERROR] Failed to insert BYOK key for workspace ${entry.workspaceId}, provider "${entry.providerId}":`, - error - ) - insertErrorCount++ + console.error(` [ERROR] Failed to insert BYOK for provider "${providerId}":`, error) + stats.errors++ } } + + console.log() } - // 7. Summary - console.log('\n---') - console.log('Migration Summary:') - console.log(` BYOK keys inserted: ${insertedCount}`) - console.log(` BYOK keys skipped (already existed): ${skippedConflictCount}`) - console.log(` BYOK insert errors: ${insertErrorCount}`) - console.log(` Env var resolution failures: ${envVarResolutionFailures}`) - console.log('\nMigration completed successfully!') + // 3. Summary + console.log('---') + console.log('Summary:') + console.log(` Workspaces processed: ${stats.workspacesProcessed}`) + console.log(` Workspaces skipped (no keys): ${stats.workspacesSkipped}`) + console.log(` BYOK keys inserted: ${stats.inserted}`) + console.log(` BYOK keys skipped (already existed): ${stats.skippedExisting}`) + console.log(` Conflicts (multiple distinct keys): ${stats.conflicts}`) + console.log(` Insert errors: ${stats.errors}`) + console.log(` Env var resolution failures: ${stats.envVarFailures}`) + + if (DRY_RUN) { + console.log('\n[DRY RUN] No changes were made to the database.') + console.log('Run without --dry-run to apply changes.') + } else { + console.log('\nMigration completed successfully!') + } } catch (error) { - console.error('Fatal error during migration:', error) + console.error('Fatal error:', error) process.exit(1) } finally { try { From 6e7bdaedda63334412bca0891cade27d4adef22a Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Sat, 7 Mar 2026 16:32:21 -0800 Subject: [PATCH 03/10] Refactor to iterate per workspace to avoid overconsuming memory --- .../scripts/migrate-block-api-keys-to-byok.ts | 137 +++++++++--------- 1 file changed, 71 insertions(+), 66 deletions(-) diff --git a/packages/db/scripts/migrate-block-api-keys-to-byok.ts b/packages/db/scripts/migrate-block-api-keys-to-byok.ts index c34485d6899..0ea759d08ee 100644 --- a/packages/db/scripts/migrate-block-api-keys-to-byok.ts +++ b/packages/db/scripts/migrate-block-api-keys-to-byok.ts @@ -213,6 +213,37 @@ type RawKeyRef = { userId: string } +type EnvLookup = { + wsEnvVars: Record + personalEnvCache: Map> +} + +async function resolveKey( + ref: RawKeyRef, + context: string, + env: EnvLookup +): Promise<{ key: string | null; envVarFailed: boolean }> { + if (!isEnvVarReference(ref.rawValue)) return { key: ref.rawValue, envVarFailed: false } + + const varName = extractEnvVarName(ref.rawValue) + if (!varName) return { key: null, envVarFailed: true } + + const personalVars = env.personalEnvCache.get(ref.userId) + const encryptedValue = env.wsEnvVars[varName] ?? personalVars?.[varName] + if (!encryptedValue) { + console.warn(` [WARN] Env var "${varName}" not found (${context})`) + return { key: null, envVarFailed: true } + } + + try { + const decrypted = await decryptSecret(encryptedValue) + return { key: decrypted, envVarFailed: false } + } catch (error) { + console.warn(` [WARN] Failed to decrypt env var "${varName}" (${context}): ${error}`) + return { key: null, envVarFailed: true } + } +} + // ---------- Main ---------- async function run() { console.log(`Mode: ${DRY_RUN ? 'DRY RUN (audit + preview)' : 'LIVE'}`) @@ -232,59 +263,54 @@ async function run() { } try { - // 1. Find all blocks that match our mapped types or contain nested tools + // 1. Get distinct workspace IDs that have matching blocks const mappedBlockTypes = Object.keys(BLOCK_TYPE_TO_PROVIDER) const agentTypes = Object.keys(TOOL_INPUT_SUBBLOCK_IDS) const allBlockTypes = [...new Set([...mappedBlockTypes, ...agentTypes])] - const rows = await db - .select({ - blockId: workflowBlocks.id, - blockName: workflowBlocks.name, - blockType: workflowBlocks.type, - subBlocks: workflowBlocks.subBlocks, - workflowId: workflow.id, - workflowName: workflow.name, - userId: workflow.userId, - workspaceId: workflow.workspaceId, - }) + const workspaceIdRows = await db + .selectDistinct({ workspaceId: workflow.workspaceId }) .from(workflowBlocks) .innerJoin(workflow, eq(workflowBlocks.workflowId, workflow.id)) .where( - sql`${workflowBlocks.type} IN (${sql.join( + sql`${workflow.workspaceId} IS NOT NULL AND ${workflowBlocks.type} IN (${sql.join( allBlockTypes.map((t) => sql`${t}`), sql`, ` )})` ) - // Group rows by workspace - const workspaceRows = new Map() - let skippedNoWorkspace = 0 - for (const row of rows) { - if (!row.workspaceId) { - skippedNoWorkspace++ - continue - } - if (!workspaceRows.has(row.workspaceId)) workspaceRows.set(row.workspaceId, []) - workspaceRows.get(row.workspaceId)!.push(row) - } - - console.log(`Found ${rows.length} candidate blocks across ${workspaceRows.size} workspaces`) - if (skippedNoWorkspace > 0) console.log(`Skipped ${skippedNoWorkspace} blocks with no workspace`) - console.log() + const workspaceIds = workspaceIdRows + .map((r) => r.workspaceId) + .filter((id): id is string => id !== null) + + console.log(`Found ${workspaceIds.length} workspaces with candidate blocks\n`) + + // 2. Process one workspace at a time + for (const workspaceId of workspaceIds) { + const blocks = await db + .select({ + blockId: workflowBlocks.id, + blockName: workflowBlocks.name, + blockType: workflowBlocks.type, + subBlocks: workflowBlocks.subBlocks, + workflowId: workflow.id, + workflowName: workflow.name, + userId: workflow.userId, + }) + .from(workflowBlocks) + .innerJoin(workflow, eq(workflowBlocks.workflowId, workflow.id)) + .where( + sql`${workflow.workspaceId} = ${workspaceId} AND ${workflowBlocks.type} IN (${sql.join( + allBlockTypes.map((t) => sql`${t}`), + sql`, ` + )})` + ) - // 2. Iterate per workspace - for (const [workspaceId, blocks] of workspaceRows) { console.log(`[Workspace ${workspaceId}] ${blocks.length} blocks`) // 2a. Extract all raw key references grouped by provider const providerKeys = new Map() - function addRef(providerId: string, ref: RawKeyRef) { - if (!providerKeys.has(providerId)) providerKeys.set(providerId, []) - providerKeys.get(providerId)!.push(ref) - } - for (const block of blocks) { const subBlocks = block.subBlocks as Record @@ -292,12 +318,14 @@ async function run() { if (providerId) { const val = subBlocks?.apiKey?.value if (typeof val === 'string' && val.trim()) { - addRef(providerId, { + const refs = providerKeys.get(providerId) ?? [] + refs.push({ rawValue: val, blockName: block.blockName, workflowName: block.workflowName, userId: block.userId, }) + providerKeys.set(providerId, refs) } } @@ -310,12 +338,14 @@ async function run() { if (!toolType || !toolApiKey || !toolApiKey.trim()) continue const toolProviderId = BLOCK_TYPE_TO_PROVIDER[toolType] if (!toolProviderId) continue - addRef(toolProviderId, { + const refs = providerKeys.get(toolProviderId) ?? [] + refs.push({ rawValue: toolApiKey, blockName: `${block.blockName} > tool "${tool.title || toolType}"`, workflowName: block.workflowName, userId: block.userId, }) + providerKeys.set(toolProviderId, refs) } } } @@ -361,34 +391,7 @@ async function run() { } } - async function resolveKey( - ref: RawKeyRef, - context: string - ): Promise { - if (!isEnvVarReference(ref.rawValue)) return ref.rawValue - - const varName = extractEnvVarName(ref.rawValue) - if (!varName) { - stats.envVarFailures++ - return null - } - - const personalVars = personalEnvCache.get(ref.userId) - const encryptedValue = wsEnvVars[varName] ?? personalVars?.[varName] - if (!encryptedValue) { - console.warn(` [WARN] Env var "${varName}" not found (${context})`) - stats.envVarFailures++ - return null - } - - try { - return await decryptSecret(encryptedValue) - } catch (error) { - console.warn(` [WARN] Failed to decrypt env var "${varName}" (${context}): ${error}`) - stats.envVarFailures++ - return null - } - } + const envLookup: EnvLookup = { wsEnvVars, personalEnvCache } // 2c. For each provider, detect conflicts then resolve and insert stats.workspacesProcessed++ @@ -397,7 +400,9 @@ async function run() { // Resolve all keys for this provider to check for conflicts const resolved: { ref: RawKeyRef; key: string }[] = [] for (const ref of refs) { - const key = await resolveKey(ref, `"${ref.blockName}" in "${ref.workflowName}"`) + const context = `"${ref.blockName}" in "${ref.workflowName}"` + const { key, envVarFailed } = await resolveKey(ref, context, envLookup) + if (envVarFailed) stats.envVarFailures++ if (key?.trim()) resolved.push({ ref, key }) } From 9a1b68120c449e4f572936c96f55e37a76aa6a06 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Sat, 7 Mar 2026 16:41:29 -0800 Subject: [PATCH 04/10] Fix row count bug --- .../scripts/migrate-block-api-keys-to-byok.ts | 34 +++++++++++++++++-- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/packages/db/scripts/migrate-block-api-keys-to-byok.ts b/packages/db/scripts/migrate-block-api-keys-to-byok.ts index 0ea759d08ee..980d10b2cc2 100644 --- a/packages/db/scripts/migrate-block-api-keys-to-byok.ts +++ b/packages/db/scripts/migrate-block-api-keys-to-byok.ts @@ -12,6 +12,10 @@ // # Live run: insert BYOK keys // bun run packages/db/scripts/migrate-block-api-keys-to-byok.ts \ // --map jina=jina --map perplexity=perplexity --map google_books=google_cloud +// +// # Optionally scope to specific users (repeatable) +// bun run packages/db/scripts/migrate-block-api-keys-to-byok.ts --dry-run \ +// --map jina=jina --user user_abc123 --user user_def456 import { createCipheriv, createDecipheriv, randomBytes } from 'crypto' import { eq, sql } from 'drizzle-orm' @@ -50,6 +54,20 @@ if (Object.keys(BLOCK_TYPE_TO_PROVIDER).length === 0) { process.exit(1) } +function parseUserArgs(): string[] { + const users: string[] = [] + const args = process.argv.slice(2) + for (let i = 0; i < args.length; i++) { + if (args[i] === '--user' && args[i + 1]) { + users.push(args[i + 1]) + i++ + } + } + return users +} + +const USER_FILTER = parseUserArgs() + // ---------- Env ---------- function getEnv(name: string): string | undefined { if (typeof process !== 'undefined' && process.env && name in process.env) { @@ -250,6 +268,7 @@ async function run() { console.log( `Mappings: ${Object.entries(BLOCK_TYPE_TO_PROVIDER).map(([b, p]) => `${b}=${p}`).join(', ')}` ) + console.log(`Users: ${USER_FILTER.length > 0 ? USER_FILTER.join(', ') : 'all'}`) console.log('---\n') const stats = { @@ -268,6 +287,14 @@ async function run() { const agentTypes = Object.keys(TOOL_INPUT_SUBBLOCK_IDS) const allBlockTypes = [...new Set([...mappedBlockTypes, ...agentTypes])] + const userFilter = + USER_FILTER.length > 0 + ? sql` AND ${workflow.userId} IN (${sql.join( + USER_FILTER.map((id) => sql`${id}`), + sql`, ` + )})` + : sql`` + const workspaceIdRows = await db .selectDistinct({ workspaceId: workflow.workspaceId }) .from(workflowBlocks) @@ -276,7 +303,7 @@ async function run() { sql`${workflow.workspaceId} IS NOT NULL AND ${workflowBlocks.type} IN (${sql.join( allBlockTypes.map((t) => sql`${t}`), sql`, ` - )})` + )})${userFilter}` ) const workspaceIds = workspaceIdRows @@ -303,7 +330,7 @@ async function run() { sql`${workflow.workspaceId} = ${workspaceId} AND ${workflowBlocks.type} IN (${sql.join( allBlockTypes.map((t) => sql`${t}`), sql`, ` - )})` + )})${userFilter}` ) console.log(`[Workspace ${workspaceId}] ${blocks.length} blocks`) @@ -447,8 +474,9 @@ async function run() { .onConflictDoNothing({ target: [workspaceBYOKKeys.workspaceId, workspaceBYOKKeys.providerId], }) + .returning({ id: workspaceBYOKKeys.id }) - if ((result as any).rowCount === 0) { + if (result.length === 0) { console.log(` [SKIP] BYOK already exists for provider "${providerId}"`) stats.skippedExisting++ } else { From a2329e2a2a4dffa021e5145d813774366a29f719 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Sat, 7 Mar 2026 17:07:21 -0800 Subject: [PATCH 05/10] Add sleep every 1000, dry run writes to files that prod consumes from --- .../scripts/migrate-block-api-keys-to-byok.ts | 106 ++++++++++++++---- 1 file changed, 86 insertions(+), 20 deletions(-) diff --git a/packages/db/scripts/migrate-block-api-keys-to-byok.ts b/packages/db/scripts/migrate-block-api-keys-to-byok.ts index 980d10b2cc2..1477013a64a 100644 --- a/packages/db/scripts/migrate-block-api-keys-to-byok.ts +++ b/packages/db/scripts/migrate-block-api-keys-to-byok.ts @@ -5,19 +5,23 @@ // Handles both literal keys ("sk-xxx...") and env var references ("{{VAR_NAME}}"). // // Usage: -// # Dry run: audit for conflicts + preview inserts (no DB writes) +// # Step 1 — Dry run: audit for conflicts + preview inserts (no DB writes) +// # Outputs migrate-byok-workspace-ids.txt for the live run. // bun run packages/db/scripts/migrate-block-api-keys-to-byok.ts --dry-run \ // --map jina=jina --map perplexity=perplexity --map google_books=google_cloud // -// # Live run: insert BYOK keys +// # Step 2 — Live run: insert BYOK keys (--from-file is required) // bun run packages/db/scripts/migrate-block-api-keys-to-byok.ts \ -// --map jina=jina --map perplexity=perplexity --map google_books=google_cloud +// --map jina=jina --map perplexity=perplexity --map google_books=google_cloud \ +// --from-file migrate-byok-workspace-ids.txt // -// # Optionally scope to specific users (repeatable) +// # Optionally scope dry run to specific users (repeatable) // bun run packages/db/scripts/migrate-block-api-keys-to-byok.ts --dry-run \ // --map jina=jina --user user_abc123 --user user_def456 import { createCipheriv, createDecipheriv, randomBytes } from 'crypto' +import { readFileSync, writeFileSync } from 'fs' +import { resolve } from 'path' import { eq, sql } from 'drizzle-orm' import { index, json, jsonb, pgTable, text, timestamp, uniqueIndex } from 'drizzle-orm/pg-core' import { drizzle } from 'drizzle-orm/postgres-js' @@ -36,7 +40,9 @@ function parseMapArgs(): Record { if (blockType && providerId) { mapping[blockType] = providerId } else { - console.error(`Invalid --map value: "${args[i + 1]}". Expected format: blockType=providerId`) + console.error( + `Invalid --map value: "${args[i + 1]}". Expected format: blockType=providerId` + ) process.exit(1) } i++ @@ -68,6 +74,27 @@ function parseUserArgs(): string[] { const USER_FILTER = parseUserArgs() +function parseFromFileArg(): string | null { + const args = process.argv.slice(2) + for (let i = 0; i < args.length; i++) { + if (args[i] === '--from-file' && args[i + 1]) { + return args[i + 1] + } + } + return null +} + +const FROM_FILE = parseFromFileArg() + +if (!DRY_RUN && !FROM_FILE) { + console.error('Live runs require --from-file. Run with --dry-run first to generate the file.') + process.exit(1) +} +if (DRY_RUN && FROM_FILE) { + console.error('--from-file cannot be used with --dry-run. Dry runs always discover workspaces from the database.') + process.exit(1) +} + // ---------- Env ---------- function getEnv(name: string): string | undefined { if (typeof process !== 'undefined' && process.env && name in process.env) { @@ -191,6 +218,23 @@ const postgresClient = postgres(CONNECTION_STRING, { }) const db = drizzle(postgresClient) +// ---------- Throttle ---------- +const BATCH_SIZE = 1000 +const SLEEP_MS = 30_000 +let requestCount = 0 +let lastWorkspaceId = '' + +async function throttle(workspaceId?: string) { + if (workspaceId) lastWorkspaceId = workspaceId + requestCount++ + if (requestCount % BATCH_SIZE === 0) { + console.log( + ` [THROTTLE] ${requestCount} DB requests — last workspace: ${lastWorkspaceId} — sleeping ${SLEEP_MS / 1000}s` + ) + await new Promise((r) => setTimeout(r, SLEEP_MS)) + } +} + // ---------- Helpers ---------- const TOOL_INPUT_SUBBLOCK_IDS: Record = { agent: 'tools', @@ -266,9 +310,12 @@ async function resolveKey( async function run() { console.log(`Mode: ${DRY_RUN ? 'DRY RUN (audit + preview)' : 'LIVE'}`) console.log( - `Mappings: ${Object.entries(BLOCK_TYPE_TO_PROVIDER).map(([b, p]) => `${b}=${p}`).join(', ')}` + `Mappings: ${Object.entries(BLOCK_TYPE_TO_PROVIDER) + .map(([b, p]) => `${b}=${p}`) + .join(', ')}` ) console.log(`Users: ${USER_FILTER.length > 0 ? USER_FILTER.join(', ') : 'all'}`) + if (FROM_FILE) console.log(`From file: ${FROM_FILE}`) console.log('---\n') const stats = { @@ -295,22 +342,37 @@ async function run() { )})` : sql`` - const workspaceIdRows = await db - .selectDistinct({ workspaceId: workflow.workspaceId }) - .from(workflowBlocks) - .innerJoin(workflow, eq(workflowBlocks.workflowId, workflow.id)) - .where( - sql`${workflow.workspaceId} IS NOT NULL AND ${workflowBlocks.type} IN (${sql.join( - allBlockTypes.map((t) => sql`${t}`), - sql`, ` - )})${userFilter}` - ) + let workspaceIds: string[] + + if (DRY_RUN) { + const workspaceIdRows = await db + .selectDistinct({ workspaceId: workflow.workspaceId }) + .from(workflowBlocks) + .innerJoin(workflow, eq(workflowBlocks.workflowId, workflow.id)) + .where( + sql`${workflow.workspaceId} IS NOT NULL AND ${workflowBlocks.type} IN (${sql.join( + allBlockTypes.map((t) => sql`${t}`), + sql`, ` + )})${userFilter}` + ) + + workspaceIds = workspaceIdRows + .map((r) => r.workspaceId) + .filter((id): id is string => id !== null) - const workspaceIds = workspaceIdRows - .map((r) => r.workspaceId) - .filter((id): id is string => id !== null) + console.log(`Found ${workspaceIds.length} workspaces with candidate blocks\n`) - console.log(`Found ${workspaceIds.length} workspaces with candidate blocks\n`) + const outPath = resolve('migrate-byok-workspace-ids.txt') + writeFileSync(outPath, workspaceIds.join('\n') + '\n') + console.log(`[DRY RUN] Wrote ${workspaceIds.length} workspace IDs to ${outPath}\n`) + } else { + const raw = readFileSync(resolve(FROM_FILE!), 'utf-8') + workspaceIds = raw + .split('\n') + .map((l) => l.trim()) + .filter(Boolean) + console.log(`Loaded ${workspaceIds.length} workspace IDs from ${FROM_FILE}\n`) + } // 2. Process one workspace at a time for (const workspaceId of workspaceIds) { @@ -332,6 +394,7 @@ async function run() { sql`, ` )})${userFilter}` ) + await throttle(workspaceId) console.log(`[Workspace ${workspaceId}] ${blocks.length} blocks`) @@ -397,6 +460,7 @@ async function run() { .from(workspaceEnvironment) .where(sql`${workspaceEnvironment.workspaceId} = ${workspaceId}`) .limit(1) + await throttle() if (wsEnvRows[0]) { wsEnvVars = (wsEnvRows[0].variables as Record) || {} } @@ -412,6 +476,7 @@ async function run() { sql`, ` )})` ) + await throttle() for (const row of personalRows) { personalEnvCache.set(row.userId, (row.variables as Record) || {}) } @@ -475,6 +540,7 @@ async function run() { target: [workspaceBYOKKeys.workspaceId, workspaceBYOKKeys.providerId], }) .returning({ id: workspaceBYOKKeys.id }) + await throttle() if (result.length === 0) { console.log(` [SKIP] BYOK already exists for provider "${providerId}"`) From e127817436563bb278e9380a72c6c89393a7518c Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Sat, 7 Mar 2026 17:20:24 -0800 Subject: [PATCH 06/10] Fix lint --- packages/db/scripts/migrate-block-api-keys-to-byok.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/db/scripts/migrate-block-api-keys-to-byok.ts b/packages/db/scripts/migrate-block-api-keys-to-byok.ts index 1477013a64a..ce947112880 100644 --- a/packages/db/scripts/migrate-block-api-keys-to-byok.ts +++ b/packages/db/scripts/migrate-block-api-keys-to-byok.ts @@ -91,7 +91,9 @@ if (!DRY_RUN && !FROM_FILE) { process.exit(1) } if (DRY_RUN && FROM_FILE) { - console.error('--from-file cannot be used with --dry-run. Dry runs always discover workspaces from the database.') + console.error( + '--from-file cannot be used with --dry-run. Dry runs always discover workspaces from the database.' + ) process.exit(1) } @@ -363,7 +365,7 @@ async function run() { console.log(`Found ${workspaceIds.length} workspaces with candidate blocks\n`) const outPath = resolve('migrate-byok-workspace-ids.txt') - writeFileSync(outPath, workspaceIds.join('\n') + '\n') + writeFileSync(outPath, `${workspaceIds.join('\n')}\n`) console.log(`[DRY RUN] Wrote ${workspaceIds.length} workspace IDs to ${outPath}\n`) } else { const raw = readFileSync(resolve(FROM_FILE!), 'utf-8') From e8ac2736a330d0a8deb8aba1ef427193f3337e15 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Mon, 9 Mar 2026 15:28:31 -0700 Subject: [PATCH 07/10] Sleep only between workspaces, not during single workspace --- .../scripts/migrate-block-api-keys-to-byok.ts | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/packages/db/scripts/migrate-block-api-keys-to-byok.ts b/packages/db/scripts/migrate-block-api-keys-to-byok.ts index ce947112880..f449b8fb568 100644 --- a/packages/db/scripts/migrate-block-api-keys-to-byok.ts +++ b/packages/db/scripts/migrate-block-api-keys-to-byok.ts @@ -221,17 +221,16 @@ const postgresClient = postgres(CONNECTION_STRING, { const db = drizzle(postgresClient) // ---------- Throttle ---------- -const BATCH_SIZE = 1000 +const WORKSPACE_BATCH_SIZE = 1000 const SLEEP_MS = 30_000 -let requestCount = 0 -let lastWorkspaceId = '' +let workspaceCount = 0 -async function throttle(workspaceId?: string) { - if (workspaceId) lastWorkspaceId = workspaceId - requestCount++ - if (requestCount % BATCH_SIZE === 0) { +async function throttleBetweenWorkspaces(workspaceId: string, total: number) { + workspaceCount++ + console.log(` [${workspaceCount}/${total}] Done with workspace ${workspaceId}`) + if (workspaceCount % WORKSPACE_BATCH_SIZE === 0) { console.log( - ` [THROTTLE] ${requestCount} DB requests — last workspace: ${lastWorkspaceId} — sleeping ${SLEEP_MS / 1000}s` + ` [THROTTLE] ${workspaceCount}/${total} workspaces — sleeping ${SLEEP_MS / 1000}s` ) await new Promise((r) => setTimeout(r, SLEEP_MS)) } @@ -396,7 +395,6 @@ async function run() { sql`, ` )})${userFilter}` ) - await throttle(workspaceId) console.log(`[Workspace ${workspaceId}] ${blocks.length} blocks`) @@ -462,7 +460,6 @@ async function run() { .from(workspaceEnvironment) .where(sql`${workspaceEnvironment.workspaceId} = ${workspaceId}`) .limit(1) - await throttle() if (wsEnvRows[0]) { wsEnvVars = (wsEnvRows[0].variables as Record) || {} } @@ -478,7 +475,6 @@ async function run() { sql`, ` )})` ) - await throttle() for (const row of personalRows) { personalEnvCache.set(row.userId, (row.variables as Record) || {}) } @@ -542,7 +538,6 @@ async function run() { target: [workspaceBYOKKeys.workspaceId, workspaceBYOKKeys.providerId], }) .returning({ id: workspaceBYOKKeys.id }) - await throttle() if (result.length === 0) { console.log(` [SKIP] BYOK already exists for provider "${providerId}"`) @@ -558,6 +553,7 @@ async function run() { } console.log() + await throttleBetweenWorkspaces(workspaceId, workspaceIds.length) } // 3. Summary From bba16f2032db797e43a5fe3bf4d09754cfcf5701 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Mon, 9 Mar 2026 15:46:39 -0700 Subject: [PATCH 08/10] Prefer plain > workspace > personal --- .../scripts/migrate-block-api-keys-to-byok.ts | 46 +++++++++++++------ 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/packages/db/scripts/migrate-block-api-keys-to-byok.ts b/packages/db/scripts/migrate-block-api-keys-to-byok.ts index f449b8fb568..e8bfeff30e5 100644 --- a/packages/db/scripts/migrate-block-api-keys-to-byok.ts +++ b/packages/db/scripts/migrate-block-api-keys-to-byok.ts @@ -281,29 +281,44 @@ type EnvLookup = { personalEnvCache: Map> } +type KeySource = 'plaintext' | 'workspace' | 'personal' + +const KEY_SOURCE_PRIORITY: Record = { + plaintext: 0, + workspace: 1, + personal: 2, +} + async function resolveKey( ref: RawKeyRef, context: string, env: EnvLookup -): Promise<{ key: string | null; envVarFailed: boolean }> { - if (!isEnvVarReference(ref.rawValue)) return { key: ref.rawValue, envVarFailed: false } +): Promise<{ key: string | null; source: KeySource; envVarFailed: boolean }> { + if (!isEnvVarReference(ref.rawValue)) { + return { key: ref.rawValue, source: 'plaintext', envVarFailed: false } + } const varName = extractEnvVarName(ref.rawValue) - if (!varName) return { key: null, envVarFailed: true } + if (!varName) return { key: null, source: 'personal', envVarFailed: true } const personalVars = env.personalEnvCache.get(ref.userId) - const encryptedValue = env.wsEnvVars[varName] ?? personalVars?.[varName] + + const wsValue = env.wsEnvVars[varName] + const personalValue = personalVars?.[varName] + const encryptedValue = wsValue ?? personalValue + const source: KeySource = wsValue ? 'workspace' : 'personal' + if (!encryptedValue) { console.warn(` [WARN] Env var "${varName}" not found (${context})`) - return { key: null, envVarFailed: true } + return { key: null, source, envVarFailed: true } } try { const decrypted = await decryptSecret(encryptedValue) - return { key: decrypted, envVarFailed: false } + return { key: decrypted, source, envVarFailed: false } } catch (error) { console.warn(` [WARN] Failed to decrypt env var "${varName}" (${context}): ${error}`) - return { key: null, envVarFailed: true } + return { key: null, source, envVarFailed: true } } } @@ -488,31 +503,34 @@ async function run() { for (const [providerId, refs] of providerKeys) { // Resolve all keys for this provider to check for conflicts - const resolved: { ref: RawKeyRef; key: string }[] = [] + const resolved: { ref: RawKeyRef; key: string; source: KeySource }[] = [] for (const ref of refs) { const context = `"${ref.blockName}" in "${ref.workflowName}"` - const { key, envVarFailed } = await resolveKey(ref, context, envLookup) + const { key, source, envVarFailed } = await resolveKey(ref, context, envLookup) if (envVarFailed) stats.envVarFailures++ - if (key?.trim()) resolved.push({ ref, key }) + if (key?.trim()) resolved.push({ ref, key: key.trim(), source }) } if (resolved.length === 0) continue + // Sort by priority: plaintext > workspace > personal + resolved.sort((a, b) => KEY_SOURCE_PRIORITY[a.source] - KEY_SOURCE_PRIORITY[b.source]) + // Detect conflicting values const distinctKeys = new Set(resolved.map((r) => r.key)) if (distinctKeys.size > 1) { stats.conflicts++ console.log(` [CONFLICT] provider "${providerId}": ${distinctKeys.size} distinct keys`) - for (const { ref, key } of resolved) { + for (const { ref, key, source } of resolved) { const display = isEnvVarReference(ref.rawValue) ? `${ref.rawValue} -> ${maskKey(key)}` : maskKey(ref.rawValue) - console.log(` "${ref.blockName}" in "${ref.workflowName}": ${display}`) + console.log(` [${source}] "${ref.blockName}" in "${ref.workflowName}": ${display}`) } - console.log(' Using first resolved key') + console.log(` Using highest-priority key (${resolved[0].source})`) } - // Use the first resolved key + // Use the highest-priority resolved key const chosen = resolved[0] if (DRY_RUN) { From 11f402b0318faa05bf19e9dd5de18577ff78a58c Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Mon, 9 Mar 2026 21:30:01 -0700 Subject: [PATCH 09/10] Personal key should only be used if its the owner of the workflow --- .../scripts/migrate-block-api-keys-to-byok.ts | 81 +++++++++++++++---- 1 file changed, 67 insertions(+), 14 deletions(-) diff --git a/packages/db/scripts/migrate-block-api-keys-to-byok.ts b/packages/db/scripts/migrate-block-api-keys-to-byok.ts index e8bfeff30e5..6539c0741f9 100644 --- a/packages/db/scripts/migrate-block-api-keys-to-byok.ts +++ b/packages/db/scripts/migrate-block-api-keys-to-byok.ts @@ -20,7 +20,7 @@ // --map jina=jina --user user_abc123 --user user_def456 import { createCipheriv, createDecipheriv, randomBytes } from 'crypto' -import { readFileSync, writeFileSync } from 'fs' +import { appendFileSync, readFileSync, writeFileSync } from 'fs' import { resolve } from 'path' import { eq, sql } from 'drizzle-orm' import { index, json, jsonb, pgTable, text, timestamp, uniqueIndex } from 'drizzle-orm/pg-core' @@ -155,6 +155,11 @@ async function decryptSecret(encryptedValue: string): Promise { } // ---------- Schema ---------- +const workspaceTable = pgTable('workspace', { + id: text('id').primaryKey(), + ownerId: text('owner_id').notNull(), +}) + const workflow = pgTable('workflow', { id: text('id').primaryKey(), userId: text('user_id').notNull(), @@ -272,6 +277,7 @@ function parseToolInputValue(value: unknown): any[] { type RawKeyRef = { rawValue: string blockName: string + workflowId: string workflowName: string userId: string } @@ -289,10 +295,15 @@ const KEY_SOURCE_PRIORITY: Record = { personal: 2, } +interface ResolveKeyContext { + workspaceId: string + workspaceOwnerId: string | null +} + async function resolveKey( ref: RawKeyRef, - context: string, - env: EnvLookup + env: EnvLookup, + ctx: ResolveKeyContext ): Promise<{ key: string | null; source: KeySource; envVarFailed: boolean }> { if (!isEnvVarReference(ref.rawValue)) { return { key: ref.rawValue, source: 'plaintext', envVarFailed: false } @@ -308,8 +319,14 @@ async function resolveKey( const encryptedValue = wsValue ?? personalValue const source: KeySource = wsValue ? 'workspace' : 'personal' + const logPrefix = + `workspace=${ctx.workspaceId} owner=${ctx.workspaceOwnerId ?? 'unknown'}` + + ` workflow=${ref.workflowId} user=${ref.userId}` + if (!encryptedValue) { - console.warn(` [WARN] Env var "${varName}" not found (${context})`) + console.warn( + ` [WARN] Env var "${varName}" not found — ${logPrefix} "${ref.blockName}" in "${ref.workflowName}"` + ) return { key: null, source, envVarFailed: true } } @@ -317,7 +334,9 @@ async function resolveKey( const decrypted = await decryptSecret(encryptedValue) return { key: decrypted, source, envVarFailed: false } } catch (error) { - console.warn(` [WARN] Failed to decrypt env var "${varName}" (${context}): ${error}`) + console.warn( + ` [WARN] Failed to decrypt env var "${varName}" — ${logPrefix} "${ref.blockName}" in "${ref.workflowName}": ${error}` + ) return { key: null, source, envVarFailed: true } } } @@ -379,8 +398,8 @@ async function run() { console.log(`Found ${workspaceIds.length} workspaces with candidate blocks\n`) const outPath = resolve('migrate-byok-workspace-ids.txt') - writeFileSync(outPath, `${workspaceIds.join('\n')}\n`) - console.log(`[DRY RUN] Wrote ${workspaceIds.length} workspace IDs to ${outPath}\n`) + writeFileSync(outPath, '') + console.log(`[DRY RUN] Will write workspace IDs with keys to ${outPath}\n`) } else { const raw = readFileSync(resolve(FROM_FILE!), 'utf-8') workspaceIds = raw @@ -411,7 +430,16 @@ async function run() { )})${userFilter}` ) - console.log(`[Workspace ${workspaceId}] ${blocks.length} blocks`) + const wsRows = await db + .select({ ownerId: workspaceTable.ownerId }) + .from(workspaceTable) + .where(eq(workspaceTable.id, workspaceId)) + .limit(1) + const workspaceOwnerId = wsRows[0]?.ownerId ?? null + + console.log( + `[Workspace ${workspaceId}] ${blocks.length} blocks, owner=${workspaceOwnerId ?? 'unknown'}` + ) // 2a. Extract all raw key references grouped by provider const providerKeys = new Map() @@ -427,6 +455,7 @@ async function run() { refs.push({ rawValue: val, blockName: block.blockName, + workflowId: block.workflowId, workflowName: block.workflowName, userId: block.userId, }) @@ -447,6 +476,7 @@ async function run() { refs.push({ rawValue: toolApiKey, blockName: `${block.blockName} > tool "${tool.title || toolType}"`, + workflowId: block.workflowId, workflowName: block.workflowName, userId: block.userId, }) @@ -461,6 +491,10 @@ async function run() { continue } + if (DRY_RUN) { + appendFileSync(resolve('migrate-byok-workspace-ids.txt'), `${workspaceId}\n`) + } + // 2b. Load env vars only if this workspace has env var references const needsEnvVars = [...providerKeys.values()] .flat() @@ -504,11 +538,21 @@ async function run() { for (const [providerId, refs] of providerKeys) { // Resolve all keys for this provider to check for conflicts const resolved: { ref: RawKeyRef; key: string; source: KeySource }[] = [] + const resolveCtx: ResolveKeyContext = { workspaceId, workspaceOwnerId } for (const ref of refs) { - const context = `"${ref.blockName}" in "${ref.workflowName}"` - const { key, source, envVarFailed } = await resolveKey(ref, context, envLookup) + const { key, source, envVarFailed } = await resolveKey(ref, envLookup, resolveCtx) if (envVarFailed) stats.envVarFailures++ - if (key?.trim()) resolved.push({ ref, key: key.trim(), source }) + if (!key?.trim()) continue + + // For personal env vars, only use the workspace owner's — never another user's + if (source === 'personal' && ref.userId !== workspaceOwnerId) { + console.log( + ` [SKIP-PERSONAL] Ignoring non-owner personal key from user=${ref.userId} workflow=${ref.workflowId} "${ref.blockName}" in "${ref.workflowName}"` + ) + continue + } + + resolved.push({ ref, key: key.trim(), source }) } if (resolved.length === 0) continue @@ -522,12 +566,18 @@ async function run() { stats.conflicts++ console.log(` [CONFLICT] provider "${providerId}": ${distinctKeys.size} distinct keys`) for (const { ref, key, source } of resolved) { + const isOwner = ref.userId === workspaceOwnerId ? ' (owner)' : '' const display = isEnvVarReference(ref.rawValue) ? `${ref.rawValue} -> ${maskKey(key)}` : maskKey(ref.rawValue) - console.log(` [${source}] "${ref.blockName}" in "${ref.workflowName}": ${display}`) + console.log( + ` [${source}] user=${ref.userId}${isOwner} workflow=${ref.workflowId} "${ref.blockName}" in "${ref.workflowName}": ${display}` + ) } - console.log(` Using highest-priority key (${resolved[0].source})`) + const chosenIsOwner = resolved[0].ref.userId === workspaceOwnerId ? ', owner' : '' + console.log( + ` Using highest-priority key (${resolved[0].source}${chosenIsOwner}, user=${resolved[0].ref.userId})` + ) } // Use the highest-priority resolved key @@ -586,7 +636,10 @@ async function run() { console.log(` Env var resolution failures: ${stats.envVarFailures}`) if (DRY_RUN) { - console.log('\n[DRY RUN] No changes were made to the database.') + console.log( + `\n[DRY RUN] Wrote ${stats.workspacesProcessed} workspace IDs (with keys) to migrate-byok-workspace-ids.txt` + ) + console.log('[DRY RUN] No changes were made to the database.') console.log('Run without --dry-run to apply changes.') } else { console.log('\nMigration completed successfully!') From efb23dad5e2219fc81582caceb50ec81699cd779 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Tue, 10 Mar 2026 00:18:58 -0700 Subject: [PATCH 10/10] Make migration concurrent --- .../scripts/migrate-block-api-keys-to-byok.ts | 524 ++++++++++-------- 1 file changed, 298 insertions(+), 226 deletions(-) diff --git a/packages/db/scripts/migrate-block-api-keys-to-byok.ts b/packages/db/scripts/migrate-block-api-keys-to-byok.ts index 6539c0741f9..bca9bb581f9 100644 --- a/packages/db/scripts/migrate-block-api-keys-to-byok.ts +++ b/packages/db/scripts/migrate-block-api-keys-to-byok.ts @@ -215,6 +215,10 @@ const workspaceEnvironment = pgTable('workspace_environment', { variables: json('variables').notNull().default('{}'), }) +const WORKSPACE_CONCURRENCY = 100 +const WORKSPACE_BATCH_SIZE = 1000 +const SLEEP_MS = 30_000 + // ---------- DB ---------- const postgresClient = postgres(CONNECTION_STRING, { prepare: false, @@ -225,22 +229,6 @@ const postgresClient = postgres(CONNECTION_STRING, { }) const db = drizzle(postgresClient) -// ---------- Throttle ---------- -const WORKSPACE_BATCH_SIZE = 1000 -const SLEEP_MS = 30_000 -let workspaceCount = 0 - -async function throttleBetweenWorkspaces(workspaceId: string, total: number) { - workspaceCount++ - console.log(` [${workspaceCount}/${total}] Done with workspace ${workspaceId}`) - if (workspaceCount % WORKSPACE_BATCH_SIZE === 0) { - console.log( - ` [THROTTLE] ${workspaceCount}/${total} workspaces — sleeping ${SLEEP_MS / 1000}s` - ) - await new Promise((r) => setTimeout(r, SLEEP_MS)) - } -} - // ---------- Helpers ---------- const TOOL_INPUT_SUBBLOCK_IDS: Record = { agent: 'tools', @@ -300,6 +288,47 @@ interface ResolveKeyContext { workspaceOwnerId: string | null } +type MigrationStats = { + workspacesProcessed: number + workspacesSkipped: number + conflicts: number + inserted: number + skippedExisting: number + errors: number + envVarFailures: number +} + +type WorkspaceResult = { + stats: MigrationStats + shouldWriteWorkspaceId: boolean +} + +function createEmptyStats(): MigrationStats { + return { + workspacesProcessed: 0, + workspacesSkipped: 0, + conflicts: 0, + inserted: 0, + skippedExisting: 0, + errors: 0, + envVarFailures: 0, + } +} + +function mergeStats(target: MigrationStats, source: MigrationStats) { + target.workspacesProcessed += source.workspacesProcessed + target.workspacesSkipped += source.workspacesSkipped + target.conflicts += source.conflicts + target.inserted += source.inserted + target.skippedExisting += source.skippedExisting + target.errors += source.errors + target.envVarFailures += source.envVarFailures +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)) +} + async function resolveKey( ref: RawKeyRef, env: EnvLookup, @@ -341,6 +370,221 @@ async function resolveKey( } } +async function processWorkspace( + workspaceId: string, + allBlockTypes: string[], + userFilter: ReturnType, + total: number, + index: number +): Promise { + const stats = createEmptyStats() + + try { + const [blocks, wsRows] = await Promise.all([ + db + .select({ + blockId: workflowBlocks.id, + blockName: workflowBlocks.name, + blockType: workflowBlocks.type, + subBlocks: workflowBlocks.subBlocks, + workflowId: workflow.id, + workflowName: workflow.name, + userId: workflow.userId, + }) + .from(workflowBlocks) + .innerJoin(workflow, eq(workflowBlocks.workflowId, workflow.id)) + .where( + sql`${workflow.workspaceId} = ${workspaceId} AND ${workflowBlocks.type} IN (${sql.join( + allBlockTypes.map((t) => sql`${t}`), + sql`, ` + )})${userFilter}` + ), + db + .select({ ownerId: workspaceTable.ownerId }) + .from(workspaceTable) + .where(eq(workspaceTable.id, workspaceId)) + .limit(1), + ]) + + const workspaceOwnerId = wsRows[0]?.ownerId ?? null + + console.log( + `[${index}/${total}] [Workspace ${workspaceId}] ${blocks.length} blocks, owner=${workspaceOwnerId ?? 'unknown'}` + ) + + const providerKeys = new Map() + + for (const block of blocks) { + const subBlocks = block.subBlocks as Record + + const providerId = BLOCK_TYPE_TO_PROVIDER[block.blockType] + if (providerId) { + const val = subBlocks?.apiKey?.value + if (typeof val === 'string' && val.trim()) { + const refs = providerKeys.get(providerId) ?? [] + refs.push({ + rawValue: val, + blockName: block.blockName, + workflowId: block.workflowId, + workflowName: block.workflowName, + userId: block.userId, + }) + providerKeys.set(providerId, refs) + } + } + + const toolInputId = TOOL_INPUT_SUBBLOCK_IDS[block.blockType] + if (toolInputId) { + const tools = parseToolInputValue(subBlocks?.[toolInputId]?.value) + for (const tool of tools) { + const toolType = tool?.type as string | undefined + const toolApiKey = tool?.params?.apiKey as string | undefined + if (!toolType || !toolApiKey || !toolApiKey.trim()) continue + const toolProviderId = BLOCK_TYPE_TO_PROVIDER[toolType] + if (!toolProviderId) continue + const refs = providerKeys.get(toolProviderId) ?? [] + refs.push({ + rawValue: toolApiKey, + blockName: `${block.blockName} > tool "${tool.title || toolType}"`, + workflowId: block.workflowId, + workflowName: block.workflowName, + userId: block.userId, + }) + providerKeys.set(toolProviderId, refs) + } + } + } + + if (providerKeys.size === 0) { + console.log(` [${index}/${total}] No API keys found, skipping\n`) + stats.workspacesSkipped++ + return { stats, shouldWriteWorkspaceId: false } + } + + const needsEnvVars = [...providerKeys.values()] + .flat() + .some((ref) => isEnvVarReference(ref.rawValue)) + + let wsEnvVars: Record = {} + const personalEnvCache = new Map>() + + if (needsEnvVars) { + const wsEnvRows = await db + .select() + .from(workspaceEnvironment) + .where(sql`${workspaceEnvironment.workspaceId} = ${workspaceId}`) + .limit(1) + if (wsEnvRows[0]) { + wsEnvVars = (wsEnvRows[0].variables as Record) || {} + } + + const userIds = [...new Set([...providerKeys.values()].flat().map((r) => r.userId))] + if (userIds.length > 0) { + const personalRows = await db + .select() + .from(environment) + .where( + sql`${environment.userId} IN (${sql.join( + userIds.map((id) => sql`${id}`), + sql`, ` + )})` + ) + for (const row of personalRows) { + personalEnvCache.set(row.userId, (row.variables as Record) || {}) + } + } + } + + const envLookup: EnvLookup = { wsEnvVars, personalEnvCache } + + stats.workspacesProcessed++ + + for (const [providerId, refs] of providerKeys) { + const resolved: { ref: RawKeyRef; key: string; source: KeySource }[] = [] + const resolveCtx: ResolveKeyContext = { workspaceId, workspaceOwnerId } + for (const ref of refs) { + const { key, source, envVarFailed } = await resolveKey(ref, envLookup, resolveCtx) + if (envVarFailed) stats.envVarFailures++ + if (!key?.trim()) continue + + if (source === 'personal' && ref.userId !== workspaceOwnerId) { + console.log( + ` [SKIP-PERSONAL] Ignoring non-owner personal key from user=${ref.userId} workflow=${ref.workflowId} "${ref.blockName}" in "${ref.workflowName}"` + ) + continue + } + + resolved.push({ ref, key: key.trim(), source }) + } + + if (resolved.length === 0) continue + + resolved.sort((a, b) => KEY_SOURCE_PRIORITY[a.source] - KEY_SOURCE_PRIORITY[b.source]) + + const distinctKeys = new Set(resolved.map((r) => r.key)) + if (distinctKeys.size > 1) { + stats.conflicts++ + console.log(` [CONFLICT] provider "${providerId}": ${distinctKeys.size} distinct keys`) + for (const { ref, key, source } of resolved) { + const isOwner = ref.userId === workspaceOwnerId ? ' (owner)' : '' + const display = isEnvVarReference(ref.rawValue) + ? `${ref.rawValue} -> ${maskKey(key)}` + : maskKey(ref.rawValue) + console.log( + ` [${source}] user=${ref.userId}${isOwner} workflow=${ref.workflowId} "${ref.blockName}" in "${ref.workflowName}": ${display}` + ) + } + const chosenIsOwner = resolved[0].ref.userId === workspaceOwnerId ? ', owner' : '' + console.log( + ` Using highest-priority key (${resolved[0].source}${chosenIsOwner}, user=${resolved[0].ref.userId})` + ) + } + + const chosen = resolved[0] + + if (DRY_RUN) { + console.log(` [DRY RUN] Would insert BYOK for provider "${providerId}": ${maskKey(chosen.key)}`) + continue + } + + try { + const encrypted = await encryptSecret(chosen.key) + const result = await db + .insert(workspaceBYOKKeys) + .values({ + id: uuidv4(), + workspaceId, + providerId, + encryptedApiKey: encrypted, + createdBy: chosen.ref.userId, + }) + .onConflictDoNothing({ + target: [workspaceBYOKKeys.workspaceId, workspaceBYOKKeys.providerId], + }) + .returning({ id: workspaceBYOKKeys.id }) + + if (result.length === 0) { + console.log(` [SKIP] BYOK already exists for provider "${providerId}"`) + stats.skippedExisting++ + } else { + console.log(` [INSERT] BYOK for provider "${providerId}": ${maskKey(chosen.key)}`) + stats.inserted++ + } + } catch (error) { + console.error(` [ERROR] Failed to insert BYOK for provider "${providerId}":`, error) + stats.errors++ + } + } + + console.log(` [${index}/${total}] Done with workspace ${workspaceId}\n`) + return { stats, shouldWriteWorkspaceId: DRY_RUN } + } catch (error) { + console.error(` [ERROR] Failed workspace ${workspaceId}:`, error) + stats.errors++ + return { stats, shouldWriteWorkspaceId: false } + } +} + // ---------- Main ---------- async function run() { console.log(`Mode: ${DRY_RUN ? 'DRY RUN (audit + preview)' : 'LIVE'}`) @@ -353,15 +597,7 @@ async function run() { if (FROM_FILE) console.log(`From file: ${FROM_FILE}`) console.log('---\n') - const stats = { - workspacesProcessed: 0, - workspacesSkipped: 0, - conflicts: 0, - inserted: 0, - skippedExisting: 0, - errors: 0, - envVarFailures: 0, - } + const stats = createEmptyStats() try { // 1. Get distinct workspace IDs that have matching blocks @@ -409,219 +645,55 @@ async function run() { console.log(`Loaded ${workspaceIds.length} workspace IDs from ${FROM_FILE}\n`) } - // 2. Process one workspace at a time - for (const workspaceId of workspaceIds) { - const blocks = await db - .select({ - blockId: workflowBlocks.id, - blockName: workflowBlocks.name, - blockType: workflowBlocks.type, - subBlocks: workflowBlocks.subBlocks, - workflowId: workflow.id, - workflowName: workflow.name, - userId: workflow.userId, - }) - .from(workflowBlocks) - .innerJoin(workflow, eq(workflowBlocks.workflowId, workflow.id)) - .where( - sql`${workflow.workspaceId} = ${workspaceId} AND ${workflowBlocks.type} IN (${sql.join( - allBlockTypes.map((t) => sql`${t}`), - sql`, ` - )})${userFilter}` - ) - - const wsRows = await db - .select({ ownerId: workspaceTable.ownerId }) - .from(workspaceTable) - .where(eq(workspaceTable.id, workspaceId)) - .limit(1) - const workspaceOwnerId = wsRows[0]?.ownerId ?? null - + // 2. Process workspaces in parallel groups of 100, pausing for 60s after each 1000 + for (let batchStart = 0; batchStart < workspaceIds.length; batchStart += WORKSPACE_BATCH_SIZE) { + const batch = workspaceIds.slice(batchStart, batchStart + WORKSPACE_BATCH_SIZE) console.log( - `[Workspace ${workspaceId}] ${blocks.length} blocks, owner=${workspaceOwnerId ?? 'unknown'}` + `[BATCH] Processing workspaces ${batchStart + 1}-${batchStart + batch.length} of ${workspaceIds.length}` ) - // 2a. Extract all raw key references grouped by provider - const providerKeys = new Map() - - for (const block of blocks) { - const subBlocks = block.subBlocks as Record - - const providerId = BLOCK_TYPE_TO_PROVIDER[block.blockType] - if (providerId) { - const val = subBlocks?.apiKey?.value - if (typeof val === 'string' && val.trim()) { - const refs = providerKeys.get(providerId) ?? [] - refs.push({ - rawValue: val, - blockName: block.blockName, - workflowId: block.workflowId, - workflowName: block.workflowName, - userId: block.userId, - }) - providerKeys.set(providerId, refs) - } - } - - const toolInputId = TOOL_INPUT_SUBBLOCK_IDS[block.blockType] - if (toolInputId) { - const tools = parseToolInputValue(subBlocks?.[toolInputId]?.value) - for (const tool of tools) { - const toolType = tool?.type as string | undefined - const toolApiKey = tool?.params?.apiKey as string | undefined - if (!toolType || !toolApiKey || !toolApiKey.trim()) continue - const toolProviderId = BLOCK_TYPE_TO_PROVIDER[toolType] - if (!toolProviderId) continue - const refs = providerKeys.get(toolProviderId) ?? [] - refs.push({ - rawValue: toolApiKey, - blockName: `${block.blockName} > tool "${tool.title || toolType}"`, - workflowId: block.workflowId, - workflowName: block.workflowName, - userId: block.userId, - }) - providerKeys.set(toolProviderId, refs) - } - } - } - - if (providerKeys.size === 0) { - console.log(' No API keys found, skipping\n') - stats.workspacesSkipped++ - continue - } - - if (DRY_RUN) { - appendFileSync(resolve('migrate-byok-workspace-ids.txt'), `${workspaceId}\n`) - } - - // 2b. Load env vars only if this workspace has env var references - const needsEnvVars = [...providerKeys.values()] - .flat() - .some((ref) => isEnvVarReference(ref.rawValue)) - - let wsEnvVars: Record = {} - const personalEnvCache = new Map>() - - if (needsEnvVars) { - const wsEnvRows = await db - .select() - .from(workspaceEnvironment) - .where(sql`${workspaceEnvironment.workspaceId} = ${workspaceId}`) - .limit(1) - if (wsEnvRows[0]) { - wsEnvVars = (wsEnvRows[0].variables as Record) || {} - } - - const userIds = [...new Set([...providerKeys.values()].flat().map((r) => r.userId))] - if (userIds.length > 0) { - const personalRows = await db - .select() - .from(environment) - .where( - sql`${environment.userId} IN (${sql.join( - userIds.map((id) => sql`${id}`), - sql`, ` - )})` - ) - for (const row of personalRows) { - personalEnvCache.set(row.userId, (row.variables as Record) || {}) - } - } - } - - const envLookup: EnvLookup = { wsEnvVars, personalEnvCache } - - // 2c. For each provider, detect conflicts then resolve and insert - stats.workspacesProcessed++ - - for (const [providerId, refs] of providerKeys) { - // Resolve all keys for this provider to check for conflicts - const resolved: { ref: RawKeyRef; key: string; source: KeySource }[] = [] - const resolveCtx: ResolveKeyContext = { workspaceId, workspaceOwnerId } - for (const ref of refs) { - const { key, source, envVarFailed } = await resolveKey(ref, envLookup, resolveCtx) - if (envVarFailed) stats.envVarFailures++ - if (!key?.trim()) continue - - // For personal env vars, only use the workspace owner's — never another user's - if (source === 'personal' && ref.userId !== workspaceOwnerId) { - console.log( - ` [SKIP-PERSONAL] Ignoring non-owner personal key from user=${ref.userId} workflow=${ref.workflowId} "${ref.blockName}" in "${ref.workflowName}"` - ) - continue - } - - resolved.push({ ref, key: key.trim(), source }) - } - - if (resolved.length === 0) continue - - // Sort by priority: plaintext > workspace > personal - resolved.sort((a, b) => KEY_SOURCE_PRIORITY[a.source] - KEY_SOURCE_PRIORITY[b.source]) - - // Detect conflicting values - const distinctKeys = new Set(resolved.map((r) => r.key)) - if (distinctKeys.size > 1) { - stats.conflicts++ - console.log(` [CONFLICT] provider "${providerId}": ${distinctKeys.size} distinct keys`) - for (const { ref, key, source } of resolved) { - const isOwner = ref.userId === workspaceOwnerId ? ' (owner)' : '' - const display = isEnvVarReference(ref.rawValue) - ? `${ref.rawValue} -> ${maskKey(key)}` - : maskKey(ref.rawValue) - console.log( - ` [${source}] user=${ref.userId}${isOwner} workflow=${ref.workflowId} "${ref.blockName}" in "${ref.workflowName}": ${display}` + for ( + let concurrencyStart = 0; + concurrencyStart < batch.length; + concurrencyStart += WORKSPACE_CONCURRENCY + ) { + const workspaceChunk = batch.slice( + concurrencyStart, + concurrencyStart + WORKSPACE_CONCURRENCY + ) + const results = await Promise.all( + workspaceChunk.map((workspaceId, chunkIndex) => + processWorkspace( + workspaceId, + allBlockTypes, + userFilter, + workspaceIds.length, + batchStart + concurrencyStart + chunkIndex + 1 ) - } - const chosenIsOwner = resolved[0].ref.userId === workspaceOwnerId ? ', owner' : '' - console.log( - ` Using highest-priority key (${resolved[0].source}${chosenIsOwner}, user=${resolved[0].ref.userId})` ) - } - - // Use the highest-priority resolved key - const chosen = resolved[0] + ) if (DRY_RUN) { - console.log( - ` [DRY RUN] Would insert BYOK for provider "${providerId}": ${maskKey(chosen.key)}` - ) - continue - } + const workspaceIdsWithKeys = results + .map((result, resultIndex) => (result.shouldWriteWorkspaceId ? workspaceChunk[resultIndex] : null)) + .filter((id): id is string => id !== null) - // Insert into BYOK - try { - const encrypted = await encryptSecret(chosen.key) - const result = await db - .insert(workspaceBYOKKeys) - .values({ - id: uuidv4(), - workspaceId, - providerId, - encryptedApiKey: encrypted, - createdBy: chosen.ref.userId, - }) - .onConflictDoNothing({ - target: [workspaceBYOKKeys.workspaceId, workspaceBYOKKeys.providerId], - }) - .returning({ id: workspaceBYOKKeys.id }) - - if (result.length === 0) { - console.log(` [SKIP] BYOK already exists for provider "${providerId}"`) - stats.skippedExisting++ - } else { - console.log(` [INSERT] BYOK for provider "${providerId}": ${maskKey(chosen.key)}`) - stats.inserted++ + if (workspaceIdsWithKeys.length > 0) { + appendFileSync(resolve('migrate-byok-workspace-ids.txt'), `${workspaceIdsWithKeys.join('\n')}\n`) } - } catch (error) { - console.error(` [ERROR] Failed to insert BYOK for provider "${providerId}":`, error) - stats.errors++ + } + + for (const result of results) { + mergeStats(stats, result.stats) } } - console.log() - await throttleBetweenWorkspaces(workspaceId, workspaceIds.length) + if (batchStart + batch.length < workspaceIds.length) { + console.log( + ` [THROTTLE] ${batchStart + batch.length}/${workspaceIds.length} workspaces — sleeping ${SLEEP_MS / 1000}s` + ) + await sleep(SLEEP_MS) + } } // 3. Summary