diff --git a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.test.ts b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.test.ts new file mode 100644 index 00000000000..e25557b940b --- /dev/null +++ b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.test.ts @@ -0,0 +1,288 @@ +/** + * @vitest-environment node + * + * Tests for the pause-resume race condition fix in PauseResumeManager. + * Verifies that enqueueOrStartResume retries with exponential backoff + * when the paused execution record has not yet been persisted. + */ +import { loggerMock } from '@sim/testing' +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' + +vi.mock('@sim/logger', () => loggerMock) + +vi.mock('@sim/db', () => ({ + db: { + transaction: vi.fn(), + insert: vi.fn(), + select: vi.fn(), + update: vi.fn(), + }, +})) + +vi.mock('@sim/db/schema', () => ({ + pausedExecutions: { + executionId: 'executionId', + id: 'id', + }, + resumeQueue: { + id: 'id', + parentExecutionId: 'parentExecutionId', + status: 'status', + queuedAt: 'queuedAt', + }, + workflowExecutionLogs: {}, +})) + +vi.mock('drizzle-orm', () => ({ + eq: vi.fn((...args: unknown[]) => args), + and: vi.fn((...args: unknown[]) => args), + asc: vi.fn((col: unknown) => col), + desc: vi.fn((col: unknown) => col), + inArray: vi.fn((...args: unknown[]) => args), + lt: vi.fn((...args: unknown[]) => args), + sql: Object.assign(vi.fn(), { raw: vi.fn() }), +})) + +vi.mock('@/lib/core/execution-limits', () => ({ + createTimeoutAbortController: vi.fn(), + getTimeoutErrorMessage: vi.fn(), +})) + +vi.mock('@/lib/execution/preprocessing', () => ({ + preprocessExecution: vi.fn(), +})) + +vi.mock('@/lib/logs/execution/logging-session', () => ({ + LoggingSession: vi.fn(), +})) + +vi.mock('@/lib/workflows/executor/execution-core', () => ({ + executeWorkflowCore: vi.fn(), +})) + +vi.mock('@/executor/execution/snapshot', () => ({ + ExecutionSnapshot: vi.fn(), +})) + +vi.mock('@/executor/utils/output-filter', () => ({ + filterOutputForLog: vi.fn(), +})) + +import { db } from '@sim/db' +import { PauseResumeManager } from './human-in-the-loop-manager' + +describe('PauseResumeManager', () => { + beforeEach(() => { + vi.clearAllMocks() + vi.useFakeTimers() + }) + + afterEach(() => { + vi.useRealTimers() + }) + + /** + * Creates a mock transaction object that simulates Drizzle query chains. + * The pausedExecution lookup uses: select().from().where().for('update').limit(1).then() + * The activeResume lookup uses: select({id}).from().where().limit(1).then() + */ + function createMockTx(pausedExecution: Record | null) { + // Build a reusable terminal chain that resolves to [] + const emptyTerminal = () => ({ + limit: vi.fn().mockReturnValue({ + then: vi + .fn() + .mockImplementation((resolve: (rows: unknown[]) => unknown) => resolve([])), + }), + then: vi + .fn() + .mockImplementation((resolve: (rows: unknown[]) => unknown) => resolve([])), + }) + + // The first select() call is the pausedExecution lookup (with .for('update')) + // The second select() call is the activeResume check (no .for()) + let selectCallCount = 0 + + return { + select: vi.fn().mockImplementation(() => { + selectCallCount++ + const isFirstSelect = selectCallCount === 1 + + return { + from: vi.fn().mockReturnValue({ + where: vi.fn().mockReturnValue({ + // .for('update') path — pausedExecution lookup + for: vi.fn().mockReturnValue({ + limit: vi.fn().mockReturnValue({ + then: vi.fn().mockImplementation( + (resolve: (rows: unknown[]) => unknown) => + resolve(isFirstSelect && pausedExecution ? [pausedExecution] : []) + ), + }), + }), + // .limit() path (no .for()) — activeResume lookup + limit: vi.fn().mockReturnValue({ + then: vi.fn().mockImplementation( + (resolve: (rows: unknown[]) => unknown) => resolve([]) + ), + }), + // Direct .then() path + then: vi.fn().mockImplementation( + (resolve: (rows: unknown[]) => unknown) => resolve([]) + ), + }), + }), + } + }), + insert: vi.fn().mockReturnValue({ + values: vi.fn().mockReturnValue({ + returning: vi.fn().mockResolvedValue([{ id: 'rq-1', queuedAt: new Date() }]), + }), + }), + update: vi.fn().mockReturnValue({ + set: vi.fn().mockReturnValue({ + where: vi.fn().mockResolvedValue(undefined), + }), + }), + } + } + + function createValidPausedExecution() { + return { + id: 'pe-1', + executionId: 'exec-1', + workflowId: 'wf-1', + pausePoints: { + 'ctx-1': { + contextId: 'ctx-1', + blockId: 'block-1', + resumeStatus: 'paused', + snapshotReady: true, + }, + }, + } + } + + describe('enqueueOrStartResume - retry on race condition', () => { + it('should retry when paused execution is not found and succeed on later attempt', async () => { + let callCount = 0 + const mockedTransaction = vi.mocked(db.transaction) + + mockedTransaction.mockImplementation( + async (callback: (tx: unknown) => Promise) => { + callCount++ + if (callCount <= 2) { + return callback(createMockTx(null)) + } + return callback(createMockTx(createValidPausedExecution())) + } + ) + + const resultPromise = PauseResumeManager.enqueueOrStartResume({ + executionId: 'exec-1', + contextId: 'ctx-1', + resumeInput: { value: 'test' }, + userId: 'user-1', + }) + + // Advance timers for retry delays (50ms, 100ms) + await vi.advanceTimersByTimeAsync(50) + await vi.advanceTimersByTimeAsync(100) + + const result = await resultPromise + + // Should have retried: 2 failures + 1 success = 3 calls + expect(callCount).toBe(3) + expect(result.status).toBe('starting') + expect(result.resumeExecutionId).toBe('exec-1') + }) + + it('should throw after exhausting all retry attempts', async () => { + const mockedTransaction = vi.mocked(db.transaction) + + // All attempts fail — pause record never appears + mockedTransaction.mockImplementation( + async (callback: (tx: unknown) => Promise) => { + return callback(createMockTx(null)) + } + ) + + // Capture the rejection to prevent unhandled rejection warnings + let caughtError: Error | undefined + const resultPromise = PauseResumeManager.enqueueOrStartResume({ + executionId: 'exec-1', + contextId: 'ctx-1', + resumeInput: {}, + userId: 'user-1', + }).catch((err: Error) => { + caughtError = err + }) + + // Advance timers for all retry delays: 50, 100, 200, 400ms + await vi.advanceTimersByTimeAsync(800) + await resultPromise + + expect(caughtError).toBeDefined() + expect(caughtError!.message).toBe('Paused execution not found or already resumed') + }) + + it('should not retry for non-race-condition errors', async () => { + let callCount = 0 + const mockedTransaction = vi.mocked(db.transaction) + + const alreadyResumedExecution = { + ...createValidPausedExecution(), + pausePoints: { + 'ctx-1': { + contextId: 'ctx-1', + blockId: 'block-1', + resumeStatus: 'resumed', // Already resumed + snapshotReady: true, + }, + }, + } + + mockedTransaction.mockImplementation( + async (callback: (tx: unknown) => Promise) => { + callCount++ + return callback(createMockTx(alreadyResumedExecution)) + } + ) + + await expect( + PauseResumeManager.enqueueOrStartResume({ + executionId: 'exec-1', + contextId: 'ctx-1', + resumeInput: {}, + userId: 'user-1', + }) + ).rejects.toThrow('Pause point already resumed or in progress') + + // Should NOT retry — this is a different error + expect(callCount).toBe(1) + }) + + it('should succeed immediately when paused execution exists on first try', async () => { + let callCount = 0 + const mockedTransaction = vi.mocked(db.transaction) + + mockedTransaction.mockImplementation( + async (callback: (tx: unknown) => Promise) => { + callCount++ + return callback(createMockTx(createValidPausedExecution())) + } + ) + + const result = await PauseResumeManager.enqueueOrStartResume({ + executionId: 'exec-1', + contextId: 'ctx-1', + resumeInput: { value: 'test' }, + userId: 'user-1', + }) + + // No retries needed + expect(callCount).toBe(1) + expect(result.status).toBe('starting') + }) + }) +}) diff --git a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts index 5649cf40e65..de9efee4ab0 100644 --- a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts +++ b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts @@ -102,6 +102,18 @@ interface StartResumeExecutionArgs { } export class PauseResumeManager { + /** + * Maximum number of retry attempts when a resume request arrives before + * the pause state has been persisted to the database. + */ + private static readonly RESUME_RETRY_MAX_ATTEMPTS = 5 + + /** + * Base delay in milliseconds between retry attempts. Each retry doubles + * the delay (exponential backoff): 50ms, 100ms, 200ms, 400ms, 800ms. + */ + private static readonly RESUME_RETRY_BASE_DELAY_MS = 50 + static async persistPauseResult(args: PersistPauseResultArgs): Promise { const { workflowId, executionId, pausePoints, snapshotSeed, executorUserId } = args @@ -122,28 +134,18 @@ export class PauseResumeManager { const now = new Date() - await db - .insert(pausedExecutions) - .values({ - id: randomUUID(), - workflowId, - executionId, - executionSnapshot: snapshotSeed, - pausePoints: pausePointsRecord, - totalPauseCount: pausePoints.length, - resumedCount: 0, - status: 'paused', - metadata: { - pauseScope: 'execution', - triggerIds: snapshotSeed.triggerIds, - executorUserId: executorUserId ?? null, - }, - pausedAt: now, - updatedAt: now, - }) - .onConflictDoUpdate({ - target: pausedExecutions.executionId, - set: { + // Use a transaction to ensure the pause state is atomically visible + // to concurrent resume requests. The SELECT FOR UPDATE in + // enqueueOrStartResume will block until this transaction commits, + // eliminating the race window where a resume request could arrive + // before the pause record exists. + await db.transaction(async (tx) => { + await tx + .insert(pausedExecutions) + .values({ + id: randomUUID(), + workflowId, + executionId, executionSnapshot: snapshotSeed, pausePoints: pausePointsRecord, totalPauseCount: pausePoints.length, @@ -154,16 +156,107 @@ export class PauseResumeManager { triggerIds: snapshotSeed.triggerIds, executorUserId: executorUserId ?? null, }, + pausedAt: now, updatedAt: now, - }, - }) + }) + .onConflictDoUpdate({ + target: pausedExecutions.executionId, + set: { + executionSnapshot: snapshotSeed, + pausePoints: pausePointsRecord, + totalPauseCount: pausePoints.length, + resumedCount: 0, + status: 'paused', + metadata: { + pauseScope: 'execution', + triggerIds: snapshotSeed.triggerIds, + executorUserId: executorUserId ?? null, + }, + updatedAt: now, + }, + }) + + // Process any resume requests that were queued while the execution + // was being persisted — within the same transaction so the row lock + // held by enqueueOrStartResume is released only after both the + // insert and the queue drain are complete. + const pendingEntry = await tx + .select() + .from(resumeQueue) + .where( + and( + eq(resumeQueue.parentExecutionId, executionId), + eq(resumeQueue.status, 'pending') + ) + ) + .orderBy(asc(resumeQueue.queuedAt)) + .limit(1) + .then((rows) => rows[0]) + if (pendingEntry) { + await tx + .update(resumeQueue) + .set({ status: 'claimed', claimedAt: new Date() }) + .where(eq(resumeQueue.id, pendingEntry.id)) + + await tx + .update(pausedExecutions) + .set({ + pausePoints: sql`jsonb_set(pause_points, ARRAY[${pendingEntry.contextId}, 'resumeStatus'], '"resuming"'::jsonb)`, + }) + .where(eq(pausedExecutions.executionId, executionId)) + } + }) + + // After the transaction commits (pause record is visible), process + // any queued resumes. This also handles the case where no pending + // entries existed inside the transaction but arrived right after. await PauseResumeManager.processQueuedResumes(executionId) } static async enqueueOrStartResume(args: EnqueueResumeArgs): Promise { const { executionId, contextId, resumeInput, userId } = args + // Retry with exponential backoff to handle the race condition where + // a resume request arrives before persistPauseResult has committed + // the pause record to the database. The SELECT FOR UPDATE in the + // transaction will block if a concurrent persistPauseResult holds + // the row lock, but if the row doesn't exist yet we need to retry. + for ( + let attempt = 0; + attempt < PauseResumeManager.RESUME_RETRY_MAX_ATTEMPTS; + attempt++ + ) { + try { + return await PauseResumeManager.tryEnqueueOrStartResume(args) + } catch (error: unknown) { + const isPausedNotFound = + error instanceof Error && + error.message === 'Paused execution not found or already resumed' + + if (!isPausedNotFound || attempt >= PauseResumeManager.RESUME_RETRY_MAX_ATTEMPTS - 1) { + throw error + } + + const delay = + PauseResumeManager.RESUME_RETRY_BASE_DELAY_MS * Math.pow(2, attempt) + logger.info( + `Paused execution not found yet, retrying in ${delay}ms (attempt ${attempt + 1}/${PauseResumeManager.RESUME_RETRY_MAX_ATTEMPTS})`, + { executionId, contextId } + ) + await new Promise((resolve) => setTimeout(resolve, delay)) + } + } + + // This is unreachable but satisfies TypeScript + throw new Error('Paused execution not found or already resumed') + } + + private static async tryEnqueueOrStartResume( + args: EnqueueResumeArgs + ): Promise { + const { executionId, contextId, resumeInput, userId } = args + return await db.transaction(async (tx) => { const pausedExecution = await tx .select()