From 1e4b3e09f3287f0af4c25a925c11cf955433db20 Mon Sep 17 00:00:00 2001 From: Maxwell Calkin Date: Mon, 9 Mar 2026 04:07:24 -0400 Subject: [PATCH] fix: resolve pause-resume race condition with transaction + retry (#3081) When a resume request arrives before persistPauseResult finishes committing the paused execution record, enqueueOrStartResume fails with "Paused execution not found". This is critical for high-throughput automation where external systems resume milliseconds after pause. Two complementary fixes: 1. Wrap persistPauseResult's DB insert + queue processing in a single transaction so the pause record and any pending resume claims are atomically visible. The SELECT FOR UPDATE in enqueueOrStartResume will block on the row lock until this transaction commits. 2. Add exponential backoff retry (5 attempts, 50ms base delay) in enqueueOrStartResume for the specific "not found" error, handling the window where the row doesn't exist yet and SELECT FOR UPDATE can't acquire a lock on a non-existent row. Only the race-condition-specific error is retried; other errors (already resumed, snapshot not ready, etc.) fail immediately. Closes #3081 Co-Authored-By: Claude Opus 4.6 --- .../human-in-the-loop-manager.test.ts | 288 ++++++++++++++++++ .../executor/human-in-the-loop-manager.ts | 141 +++++++-- 2 files changed, 405 insertions(+), 24 deletions(-) create mode 100644 apps/sim/lib/workflows/executor/human-in-the-loop-manager.test.ts diff --git a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.test.ts b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.test.ts new file mode 100644 index 00000000000..e25557b940b --- /dev/null +++ b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.test.ts @@ -0,0 +1,288 @@ +/** + * @vitest-environment node + * + * Tests for the pause-resume race condition fix in PauseResumeManager. + * Verifies that enqueueOrStartResume retries with exponential backoff + * when the paused execution record has not yet been persisted. + */ +import { loggerMock } from '@sim/testing' +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' + +vi.mock('@sim/logger', () => loggerMock) + +vi.mock('@sim/db', () => ({ + db: { + transaction: vi.fn(), + insert: vi.fn(), + select: vi.fn(), + update: vi.fn(), + }, +})) + +vi.mock('@sim/db/schema', () => ({ + pausedExecutions: { + executionId: 'executionId', + id: 'id', + }, + resumeQueue: { + id: 'id', + parentExecutionId: 'parentExecutionId', + status: 'status', + queuedAt: 'queuedAt', + }, + workflowExecutionLogs: {}, +})) + +vi.mock('drizzle-orm', () => ({ + eq: vi.fn((...args: unknown[]) => args), + and: vi.fn((...args: unknown[]) => args), + asc: vi.fn((col: unknown) => col), + desc: vi.fn((col: unknown) => col), + inArray: vi.fn((...args: unknown[]) => args), + lt: vi.fn((...args: unknown[]) => args), + sql: Object.assign(vi.fn(), { raw: vi.fn() }), +})) + +vi.mock('@/lib/core/execution-limits', () => ({ + createTimeoutAbortController: vi.fn(), + getTimeoutErrorMessage: vi.fn(), +})) + +vi.mock('@/lib/execution/preprocessing', () => ({ + preprocessExecution: vi.fn(), +})) + +vi.mock('@/lib/logs/execution/logging-session', () => ({ + LoggingSession: vi.fn(), +})) + +vi.mock('@/lib/workflows/executor/execution-core', () => ({ + executeWorkflowCore: vi.fn(), +})) + +vi.mock('@/executor/execution/snapshot', () => ({ + ExecutionSnapshot: vi.fn(), +})) + +vi.mock('@/executor/utils/output-filter', () => ({ + filterOutputForLog: vi.fn(), +})) + +import { db } from '@sim/db' +import { PauseResumeManager } from './human-in-the-loop-manager' + +describe('PauseResumeManager', () => { + beforeEach(() => { + vi.clearAllMocks() + vi.useFakeTimers() + }) + + afterEach(() => { + vi.useRealTimers() + }) + + /** + * Creates a mock transaction object that simulates Drizzle query chains. + * The pausedExecution lookup uses: select().from().where().for('update').limit(1).then() + * The activeResume lookup uses: select({id}).from().where().limit(1).then() + */ + function createMockTx(pausedExecution: Record | null) { + // Build a reusable terminal chain that resolves to [] + const emptyTerminal = () => ({ + limit: vi.fn().mockReturnValue({ + then: vi + .fn() + .mockImplementation((resolve: (rows: unknown[]) => unknown) => resolve([])), + }), + then: vi + .fn() + .mockImplementation((resolve: (rows: unknown[]) => unknown) => resolve([])), + }) + + // The first select() call is the pausedExecution lookup (with .for('update')) + // The second select() call is the activeResume check (no .for()) + let selectCallCount = 0 + + return { + select: vi.fn().mockImplementation(() => { + selectCallCount++ + const isFirstSelect = selectCallCount === 1 + + return { + from: vi.fn().mockReturnValue({ + where: vi.fn().mockReturnValue({ + // .for('update') path — pausedExecution lookup + for: vi.fn().mockReturnValue({ + limit: vi.fn().mockReturnValue({ + then: vi.fn().mockImplementation( + (resolve: (rows: unknown[]) => unknown) => + resolve(isFirstSelect && pausedExecution ? [pausedExecution] : []) + ), + }), + }), + // .limit() path (no .for()) — activeResume lookup + limit: vi.fn().mockReturnValue({ + then: vi.fn().mockImplementation( + (resolve: (rows: unknown[]) => unknown) => resolve([]) + ), + }), + // Direct .then() path + then: vi.fn().mockImplementation( + (resolve: (rows: unknown[]) => unknown) => resolve([]) + ), + }), + }), + } + }), + insert: vi.fn().mockReturnValue({ + values: vi.fn().mockReturnValue({ + returning: vi.fn().mockResolvedValue([{ id: 'rq-1', queuedAt: new Date() }]), + }), + }), + update: vi.fn().mockReturnValue({ + set: vi.fn().mockReturnValue({ + where: vi.fn().mockResolvedValue(undefined), + }), + }), + } + } + + function createValidPausedExecution() { + return { + id: 'pe-1', + executionId: 'exec-1', + workflowId: 'wf-1', + pausePoints: { + 'ctx-1': { + contextId: 'ctx-1', + blockId: 'block-1', + resumeStatus: 'paused', + snapshotReady: true, + }, + }, + } + } + + describe('enqueueOrStartResume - retry on race condition', () => { + it('should retry when paused execution is not found and succeed on later attempt', async () => { + let callCount = 0 + const mockedTransaction = vi.mocked(db.transaction) + + mockedTransaction.mockImplementation( + async (callback: (tx: unknown) => Promise) => { + callCount++ + if (callCount <= 2) { + return callback(createMockTx(null)) + } + return callback(createMockTx(createValidPausedExecution())) + } + ) + + const resultPromise = PauseResumeManager.enqueueOrStartResume({ + executionId: 'exec-1', + contextId: 'ctx-1', + resumeInput: { value: 'test' }, + userId: 'user-1', + }) + + // Advance timers for retry delays (50ms, 100ms) + await vi.advanceTimersByTimeAsync(50) + await vi.advanceTimersByTimeAsync(100) + + const result = await resultPromise + + // Should have retried: 2 failures + 1 success = 3 calls + expect(callCount).toBe(3) + expect(result.status).toBe('starting') + expect(result.resumeExecutionId).toBe('exec-1') + }) + + it('should throw after exhausting all retry attempts', async () => { + const mockedTransaction = vi.mocked(db.transaction) + + // All attempts fail — pause record never appears + mockedTransaction.mockImplementation( + async (callback: (tx: unknown) => Promise) => { + return callback(createMockTx(null)) + } + ) + + // Capture the rejection to prevent unhandled rejection warnings + let caughtError: Error | undefined + const resultPromise = PauseResumeManager.enqueueOrStartResume({ + executionId: 'exec-1', + contextId: 'ctx-1', + resumeInput: {}, + userId: 'user-1', + }).catch((err: Error) => { + caughtError = err + }) + + // Advance timers for all retry delays: 50, 100, 200, 400ms + await vi.advanceTimersByTimeAsync(800) + await resultPromise + + expect(caughtError).toBeDefined() + expect(caughtError!.message).toBe('Paused execution not found or already resumed') + }) + + it('should not retry for non-race-condition errors', async () => { + let callCount = 0 + const mockedTransaction = vi.mocked(db.transaction) + + const alreadyResumedExecution = { + ...createValidPausedExecution(), + pausePoints: { + 'ctx-1': { + contextId: 'ctx-1', + blockId: 'block-1', + resumeStatus: 'resumed', // Already resumed + snapshotReady: true, + }, + }, + } + + mockedTransaction.mockImplementation( + async (callback: (tx: unknown) => Promise) => { + callCount++ + return callback(createMockTx(alreadyResumedExecution)) + } + ) + + await expect( + PauseResumeManager.enqueueOrStartResume({ + executionId: 'exec-1', + contextId: 'ctx-1', + resumeInput: {}, + userId: 'user-1', + }) + ).rejects.toThrow('Pause point already resumed or in progress') + + // Should NOT retry — this is a different error + expect(callCount).toBe(1) + }) + + it('should succeed immediately when paused execution exists on first try', async () => { + let callCount = 0 + const mockedTransaction = vi.mocked(db.transaction) + + mockedTransaction.mockImplementation( + async (callback: (tx: unknown) => Promise) => { + callCount++ + return callback(createMockTx(createValidPausedExecution())) + } + ) + + const result = await PauseResumeManager.enqueueOrStartResume({ + executionId: 'exec-1', + contextId: 'ctx-1', + resumeInput: { value: 'test' }, + userId: 'user-1', + }) + + // No retries needed + expect(callCount).toBe(1) + expect(result.status).toBe('starting') + }) + }) +}) diff --git a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts index 5649cf40e65..de9efee4ab0 100644 --- a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts +++ b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts @@ -102,6 +102,18 @@ interface StartResumeExecutionArgs { } export class PauseResumeManager { + /** + * Maximum number of retry attempts when a resume request arrives before + * the pause state has been persisted to the database. + */ + private static readonly RESUME_RETRY_MAX_ATTEMPTS = 5 + + /** + * Base delay in milliseconds between retry attempts. Each retry doubles + * the delay (exponential backoff): 50ms, 100ms, 200ms, 400ms, 800ms. + */ + private static readonly RESUME_RETRY_BASE_DELAY_MS = 50 + static async persistPauseResult(args: PersistPauseResultArgs): Promise { const { workflowId, executionId, pausePoints, snapshotSeed, executorUserId } = args @@ -122,28 +134,18 @@ export class PauseResumeManager { const now = new Date() - await db - .insert(pausedExecutions) - .values({ - id: randomUUID(), - workflowId, - executionId, - executionSnapshot: snapshotSeed, - pausePoints: pausePointsRecord, - totalPauseCount: pausePoints.length, - resumedCount: 0, - status: 'paused', - metadata: { - pauseScope: 'execution', - triggerIds: snapshotSeed.triggerIds, - executorUserId: executorUserId ?? null, - }, - pausedAt: now, - updatedAt: now, - }) - .onConflictDoUpdate({ - target: pausedExecutions.executionId, - set: { + // Use a transaction to ensure the pause state is atomically visible + // to concurrent resume requests. The SELECT FOR UPDATE in + // enqueueOrStartResume will block until this transaction commits, + // eliminating the race window where a resume request could arrive + // before the pause record exists. + await db.transaction(async (tx) => { + await tx + .insert(pausedExecutions) + .values({ + id: randomUUID(), + workflowId, + executionId, executionSnapshot: snapshotSeed, pausePoints: pausePointsRecord, totalPauseCount: pausePoints.length, @@ -154,16 +156,107 @@ export class PauseResumeManager { triggerIds: snapshotSeed.triggerIds, executorUserId: executorUserId ?? null, }, + pausedAt: now, updatedAt: now, - }, - }) + }) + .onConflictDoUpdate({ + target: pausedExecutions.executionId, + set: { + executionSnapshot: snapshotSeed, + pausePoints: pausePointsRecord, + totalPauseCount: pausePoints.length, + resumedCount: 0, + status: 'paused', + metadata: { + pauseScope: 'execution', + triggerIds: snapshotSeed.triggerIds, + executorUserId: executorUserId ?? null, + }, + updatedAt: now, + }, + }) + + // Process any resume requests that were queued while the execution + // was being persisted — within the same transaction so the row lock + // held by enqueueOrStartResume is released only after both the + // insert and the queue drain are complete. + const pendingEntry = await tx + .select() + .from(resumeQueue) + .where( + and( + eq(resumeQueue.parentExecutionId, executionId), + eq(resumeQueue.status, 'pending') + ) + ) + .orderBy(asc(resumeQueue.queuedAt)) + .limit(1) + .then((rows) => rows[0]) + if (pendingEntry) { + await tx + .update(resumeQueue) + .set({ status: 'claimed', claimedAt: new Date() }) + .where(eq(resumeQueue.id, pendingEntry.id)) + + await tx + .update(pausedExecutions) + .set({ + pausePoints: sql`jsonb_set(pause_points, ARRAY[${pendingEntry.contextId}, 'resumeStatus'], '"resuming"'::jsonb)`, + }) + .where(eq(pausedExecutions.executionId, executionId)) + } + }) + + // After the transaction commits (pause record is visible), process + // any queued resumes. This also handles the case where no pending + // entries existed inside the transaction but arrived right after. await PauseResumeManager.processQueuedResumes(executionId) } static async enqueueOrStartResume(args: EnqueueResumeArgs): Promise { const { executionId, contextId, resumeInput, userId } = args + // Retry with exponential backoff to handle the race condition where + // a resume request arrives before persistPauseResult has committed + // the pause record to the database. The SELECT FOR UPDATE in the + // transaction will block if a concurrent persistPauseResult holds + // the row lock, but if the row doesn't exist yet we need to retry. + for ( + let attempt = 0; + attempt < PauseResumeManager.RESUME_RETRY_MAX_ATTEMPTS; + attempt++ + ) { + try { + return await PauseResumeManager.tryEnqueueOrStartResume(args) + } catch (error: unknown) { + const isPausedNotFound = + error instanceof Error && + error.message === 'Paused execution not found or already resumed' + + if (!isPausedNotFound || attempt >= PauseResumeManager.RESUME_RETRY_MAX_ATTEMPTS - 1) { + throw error + } + + const delay = + PauseResumeManager.RESUME_RETRY_BASE_DELAY_MS * Math.pow(2, attempt) + logger.info( + `Paused execution not found yet, retrying in ${delay}ms (attempt ${attempt + 1}/${PauseResumeManager.RESUME_RETRY_MAX_ATTEMPTS})`, + { executionId, contextId } + ) + await new Promise((resolve) => setTimeout(resolve, delay)) + } + } + + // This is unreachable but satisfies TypeScript + throw new Error('Paused execution not found or already resumed') + } + + private static async tryEnqueueOrStartResume( + args: EnqueueResumeArgs + ): Promise { + const { executionId, contextId, resumeInput, userId } = args + return await db.transaction(async (tx) => { const pausedExecution = await tx .select()