diff --git a/apps/sim/app/api/workflows/[id]/execute/route.async.test.ts b/apps/sim/app/api/workflows/[id]/execute/route.async.test.ts index 0a9aa008ba9..1afe4f8b1df 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.async.test.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.async.test.ts @@ -18,6 +18,11 @@ const { })) vi.mock('@/lib/auth/hybrid', () => ({ + AuthType: { + SESSION: 'session', + API_KEY: 'api_key', + INTERNAL_JWT: 'internal_jwt', + }, checkHybridAuth: mockCheckHybridAuth, })) diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index 1200debd413..db7c8d203ee 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -20,6 +20,10 @@ import { } from '@/lib/execution/call-chain' import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/event-buffer' import { processInputFileFields } from '@/lib/execution/files' +import { + registerManualExecutionAborter, + unregisterManualExecutionAborter, +} from '@/lib/execution/manual-cancellation' import { preprocessExecution } from '@/lib/execution/preprocessing' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { @@ -775,6 +779,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: const encoder = new TextEncoder() const timeoutController = createTimeoutAbortController(preprocessResult.executionTimeout?.sync) let isStreamClosed = false + let isManualAbortRegistered = false const eventWriter = createExecutionEventWriter(executionId) setExecutionMeta(executionId, { @@ -787,6 +792,9 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: async start(controller) { let finalMetaStatus: 'complete' | 'error' | 'cancelled' | null = null + registerManualExecutionAborter(executionId, timeoutController.abort) + isManualAbortRegistered = true + const sendEvent = (event: ExecutionEvent) => { if (!isStreamClosed) { try { @@ -1154,6 +1162,10 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: }) finalMetaStatus = 'error' } finally { + if (isManualAbortRegistered) { + unregisterManualExecutionAborter(executionId) + isManualAbortRegistered = false + } try { await eventWriter.close() } catch (closeError) { diff --git a/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.test.ts b/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.test.ts new file mode 100644 index 00000000000..e3f373675a0 --- /dev/null +++ b/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.test.ts @@ -0,0 +1,148 @@ +/** + * @vitest-environment node + */ + +import { NextRequest } from 'next/server' +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const mockCheckHybridAuth = vi.fn() +const mockAuthorizeWorkflowByWorkspacePermission = vi.fn() +const mockMarkExecutionCancelled = vi.fn() +const mockAbortManualExecution = vi.fn() + +vi.mock('@sim/logger', () => ({ + createLogger: () => ({ info: vi.fn(), warn: vi.fn(), error: vi.fn() }), +})) + +vi.mock('@/lib/auth/hybrid', () => ({ + checkHybridAuth: (...args: unknown[]) => mockCheckHybridAuth(...args), +})) + +vi.mock('@/lib/execution/cancellation', () => ({ + markExecutionCancelled: (...args: unknown[]) => mockMarkExecutionCancelled(...args), +})) + +vi.mock('@/lib/execution/manual-cancellation', () => ({ + abortManualExecution: (...args: unknown[]) => mockAbortManualExecution(...args), +})) + +vi.mock('@/lib/workflows/utils', () => ({ + authorizeWorkflowByWorkspacePermission: (params: unknown) => + mockAuthorizeWorkflowByWorkspacePermission(params), +})) + +import { POST } from './route' + +describe('POST /api/workflows/[id]/executions/[executionId]/cancel', () => { + beforeEach(() => { + vi.clearAllMocks() + mockCheckHybridAuth.mockResolvedValue({ success: true, userId: 'user-1' }) + mockAuthorizeWorkflowByWorkspacePermission.mockResolvedValue({ allowed: true }) + mockAbortManualExecution.mockReturnValue(false) + }) + + it('returns success when cancellation was durably recorded', async () => { + mockMarkExecutionCancelled.mockResolvedValue({ + durablyRecorded: true, + reason: 'recorded', + }) + + const response = await POST( + new NextRequest('http://localhost/api/workflows/wf-1/executions/ex-1/cancel', { + method: 'POST', + }), + { + params: Promise.resolve({ id: 'wf-1', executionId: 'ex-1' }), + } + ) + + expect(response.status).toBe(200) + await expect(response.json()).resolves.toEqual({ + success: true, + executionId: 'ex-1', + redisAvailable: true, + durablyRecorded: true, + locallyAborted: false, + reason: 'recorded', + }) + }) + + it('returns unsuccessful response when Redis is unavailable', async () => { + mockMarkExecutionCancelled.mockResolvedValue({ + durablyRecorded: false, + reason: 'redis_unavailable', + }) + + const response = await POST( + new NextRequest('http://localhost/api/workflows/wf-1/executions/ex-1/cancel', { + method: 'POST', + }), + { + params: Promise.resolve({ id: 'wf-1', executionId: 'ex-1' }), + } + ) + + expect(response.status).toBe(200) + await expect(response.json()).resolves.toEqual({ + success: false, + executionId: 'ex-1', + redisAvailable: false, + durablyRecorded: false, + locallyAborted: false, + reason: 'redis_unavailable', + }) + }) + + it('returns unsuccessful response when Redis persistence fails', async () => { + mockMarkExecutionCancelled.mockResolvedValue({ + durablyRecorded: false, + reason: 'redis_write_failed', + }) + + const response = await POST( + new NextRequest('http://localhost/api/workflows/wf-1/executions/ex-1/cancel', { + method: 'POST', + }), + { + params: Promise.resolve({ id: 'wf-1', executionId: 'ex-1' }), + } + ) + + expect(response.status).toBe(200) + await expect(response.json()).resolves.toEqual({ + success: false, + executionId: 'ex-1', + redisAvailable: true, + durablyRecorded: false, + locallyAborted: false, + reason: 'redis_write_failed', + }) + }) + + it('returns success when local fallback aborts execution without Redis durability', async () => { + mockMarkExecutionCancelled.mockResolvedValue({ + durablyRecorded: false, + reason: 'redis_unavailable', + }) + mockAbortManualExecution.mockReturnValue(true) + + const response = await POST( + new NextRequest('http://localhost/api/workflows/wf-1/executions/ex-1/cancel', { + method: 'POST', + }), + { + params: Promise.resolve({ id: 'wf-1', executionId: 'ex-1' }), + } + ) + + expect(response.status).toBe(200) + await expect(response.json()).resolves.toEqual({ + success: true, + executionId: 'ex-1', + redisAvailable: false, + durablyRecorded: false, + locallyAborted: true, + reason: 'redis_unavailable', + }) + }) +}) diff --git a/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.ts b/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.ts index 49c99e1ede6..04c24abbb28 100644 --- a/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.ts +++ b/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.ts @@ -2,6 +2,7 @@ import { createLogger } from '@sim/logger' import { type NextRequest, NextResponse } from 'next/server' import { checkHybridAuth } from '@/lib/auth/hybrid' import { markExecutionCancelled } from '@/lib/execution/cancellation' +import { abortManualExecution } from '@/lib/execution/manual-cancellation' import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils' const logger = createLogger('CancelExecutionAPI') @@ -35,20 +36,27 @@ export async function POST( logger.info('Cancel execution requested', { workflowId, executionId, userId: auth.userId }) - const marked = await markExecutionCancelled(executionId) + const cancellation = await markExecutionCancelled(executionId) + const locallyAborted = abortManualExecution(executionId) - if (marked) { + if (cancellation.durablyRecorded) { logger.info('Execution marked as cancelled in Redis', { executionId }) + } else if (locallyAborted) { + logger.info('Execution cancelled via local in-process fallback', { executionId }) } else { - logger.info('Redis not available, cancellation will rely on connection close', { + logger.warn('Execution cancellation was not durably recorded', { executionId, + reason: cancellation.reason, }) } return NextResponse.json({ - success: true, + success: cancellation.durablyRecorded || locallyAborted, executionId, - redisAvailable: marked, + redisAvailable: cancellation.reason !== 'redis_unavailable', + durablyRecorded: cancellation.durablyRecorded, + locallyAborted, + reason: cancellation.reason, }) } catch (error: any) { logger.error('Failed to cancel execution', { workflowId, executionId, error: error.message }) diff --git a/apps/sim/lib/execution/cancellation.test.ts b/apps/sim/lib/execution/cancellation.test.ts new file mode 100644 index 00000000000..d68fe46a2bf --- /dev/null +++ b/apps/sim/lib/execution/cancellation.test.ts @@ -0,0 +1,84 @@ +import { loggerMock } from '@sim/testing' +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const { mockGetRedisClient, mockRedisSet } = vi.hoisted(() => ({ + mockGetRedisClient: vi.fn(), + mockRedisSet: vi.fn(), +})) + +vi.mock('@sim/logger', () => loggerMock) + +vi.mock('@/lib/core/config/redis', () => ({ + getRedisClient: mockGetRedisClient, +})) + +import { markExecutionCancelled } from './cancellation' +import { + abortManualExecution, + registerManualExecutionAborter, + unregisterManualExecutionAborter, +} from './manual-cancellation' + +describe('markExecutionCancelled', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + it('returns redis_unavailable when no Redis client exists', async () => { + mockGetRedisClient.mockReturnValue(null) + + await expect(markExecutionCancelled('execution-1')).resolves.toEqual({ + durablyRecorded: false, + reason: 'redis_unavailable', + }) + }) + + it('returns recorded when Redis write succeeds', async () => { + mockRedisSet.mockResolvedValue('OK') + mockGetRedisClient.mockReturnValue({ set: mockRedisSet }) + + await expect(markExecutionCancelled('execution-1')).resolves.toEqual({ + durablyRecorded: true, + reason: 'recorded', + }) + }) + + it('returns redis_write_failed when Redis write throws', async () => { + mockRedisSet.mockRejectedValue(new Error('set failed')) + mockGetRedisClient.mockReturnValue({ set: mockRedisSet }) + + await expect(markExecutionCancelled('execution-1')).resolves.toEqual({ + durablyRecorded: false, + reason: 'redis_write_failed', + }) + }) +}) + +describe('manual execution cancellation registry', () => { + beforeEach(() => { + unregisterManualExecutionAborter('execution-1') + }) + + it('aborts registered executions', () => { + const abort = vi.fn() + + registerManualExecutionAborter('execution-1', abort) + + expect(abortManualExecution('execution-1')).toBe(true) + expect(abort).toHaveBeenCalledTimes(1) + }) + + it('returns false when no execution is registered', () => { + expect(abortManualExecution('execution-missing')).toBe(false) + }) + + it('unregisters executions', () => { + const abort = vi.fn() + + registerManualExecutionAborter('execution-1', abort) + unregisterManualExecutionAborter('execution-1') + + expect(abortManualExecution('execution-1')).toBe(false) + expect(abort).not.toHaveBeenCalled() + }) +}) diff --git a/apps/sim/lib/execution/cancellation.ts b/apps/sim/lib/execution/cancellation.ts index 671209e9c66..26273f8521b 100644 --- a/apps/sim/lib/execution/cancellation.ts +++ b/apps/sim/lib/execution/cancellation.ts @@ -6,27 +6,36 @@ const logger = createLogger('ExecutionCancellation') const EXECUTION_CANCEL_PREFIX = 'execution:cancel:' const EXECUTION_CANCEL_EXPIRY = 60 * 60 +export type ExecutionCancellationRecordResult = + | { durablyRecorded: true; reason: 'recorded' } + | { + durablyRecorded: false + reason: 'redis_unavailable' | 'redis_write_failed' + } + export function isRedisCancellationEnabled(): boolean { return getRedisClient() !== null } /** * Mark an execution as cancelled in Redis. - * Returns true if Redis is available and the flag was set, false otherwise. + * Returns whether the cancellation was durably recorded. */ -export async function markExecutionCancelled(executionId: string): Promise { +export async function markExecutionCancelled( + executionId: string +): Promise { const redis = getRedisClient() if (!redis) { - return false + return { durablyRecorded: false, reason: 'redis_unavailable' } } try { await redis.set(`${EXECUTION_CANCEL_PREFIX}${executionId}`, '1', 'EX', EXECUTION_CANCEL_EXPIRY) logger.info('Marked execution as cancelled', { executionId }) - return true + return { durablyRecorded: true, reason: 'recorded' } } catch (error) { logger.error('Failed to mark execution as cancelled', { executionId, error }) - return false + return { durablyRecorded: false, reason: 'redis_write_failed' } } } diff --git a/apps/sim/lib/execution/manual-cancellation.ts b/apps/sim/lib/execution/manual-cancellation.ts new file mode 100644 index 00000000000..5e5da4b5187 --- /dev/null +++ b/apps/sim/lib/execution/manual-cancellation.ts @@ -0,0 +1,19 @@ +const activeExecutionAborters = new Map void>() + +export function registerManualExecutionAborter(executionId: string, abort: () => void): void { + activeExecutionAborters.set(executionId, abort) +} + +export function unregisterManualExecutionAborter(executionId: string): void { + activeExecutionAborters.delete(executionId) +} + +export function abortManualExecution(executionId: string): boolean { + const abort = activeExecutionAborters.get(executionId) + if (!abort) { + return false + } + + abort() + return true +}