-
Notifications
You must be signed in to change notification settings - Fork 3.4k
feat(sim-mailer): email inbox for mothership with chat history and plan gating #3558
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+16,799
−5
Merged
Changes from all commits
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
88f3558
feat(sim-mailer): email inbox for mothership with chat history and pl…
waleedlatif1 0fc5333
revert hardcoded ff
waleedlatif1 7ac4875
fix(inbox): address PR review comments - plan enforcement, idempotenc…
waleedlatif1 183a880
improvement(inbox): harden security and efficiency from code audit
waleedlatif1 347b0c9
fix(inbox): replace Bearer token auth with proper Svix HMAC-SHA256 we…
waleedlatif1 95c02d7
fix(inbox): require webhook secret — reject requests when secret is m…
waleedlatif1 720fd7f
fix(inbox): address second round of PR review comments
waleedlatif1 d712c51
fix(inbox): address third round of PR review comments
waleedlatif1 27a42bc
fix(inbox): validate cursor param, preserve code blocks in HTML strip…
waleedlatif1 118fc41
fix(inbox): return 500 on webhook server errors to enable Svix retries
waleedlatif1 1a0aced
fix(inbox): remove isHosted guard from hasInboxAccess — feature flag …
waleedlatif1 b786865
fix(inbox): prevent double-enable from deleting webhook secret row
waleedlatif1 b936cdb
fix(inbox): null-safe stripThinkingTags, encode URL params, surface r…
waleedlatif1 0b316ee
improvement(inbox): remove unused types, narrow SELECT queries, fix o…
waleedlatif1 29a855f
fix(inbox): add keyboard accessibility to clickable task rows
waleedlatif1 24df402
fix(inbox): use Svix library for webhook verification, fix responseSe…
waleedlatif1 d36b3ee
chore(db): rebase inbox migration onto feat/mothership-copilot (0172 …
waleedlatif1 60b1a54
fix(db): rebase inbox migration to 0173 after feat/mothership-copilot…
waleedlatif1 0355565
fix(db): regenerate inbox migration after rebase on feat/mothership-c…
waleedlatif1 8b6a3b3
fix(inbox): case-insensitive email match and sanitize javascript: URI…
waleedlatif1 4997216
fix(inbox): case-insensitive email match in resolveUserId
waleedlatif1 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,277 @@ | ||
| import { | ||
| db, | ||
| mothershipInboxAllowedSender, | ||
| mothershipInboxTask, | ||
| mothershipInboxWebhook, | ||
| permissions, | ||
| user, | ||
| workspace, | ||
| } from '@sim/db' | ||
| import { createLogger } from '@sim/logger' | ||
| import { tasks } from '@trigger.dev/sdk' | ||
| import { and, eq, gt, ne, sql } from 'drizzle-orm' | ||
| import { NextResponse } from 'next/server' | ||
| import { Webhook } from 'svix' | ||
| import { v4 as uuidv4 } from 'uuid' | ||
| import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags' | ||
| import { executeInboxTask } from '@/lib/mothership/inbox/executor' | ||
| import type { AgentMailWebhookPayload, RejectionReason } from '@/lib/mothership/inbox/types' | ||
|
|
||
| const logger = createLogger('AgentMailWebhook') | ||
|
|
||
| const AUTOMATED_SENDERS = ['mailer-daemon@', 'noreply@', 'no-reply@', 'postmaster@'] | ||
| const MAX_EMAILS_PER_HOUR = 20 | ||
|
|
||
| export async function POST(req: Request) { | ||
| try { | ||
| const rawBody = await req.text() | ||
| const svixId = req.headers.get('svix-id') | ||
| const svixTimestamp = req.headers.get('svix-timestamp') | ||
| const svixSignature = req.headers.get('svix-signature') | ||
|
|
||
| const payload = JSON.parse(rawBody) as AgentMailWebhookPayload | ||
|
|
||
| if (payload.event_type !== 'message.received') { | ||
| return NextResponse.json({ ok: true }) | ||
| } | ||
|
|
||
| const { message } = payload | ||
| const inboxId = message?.inbox_id | ||
| if (!message || !inboxId) { | ||
| return NextResponse.json({ ok: true }) | ||
| } | ||
waleedlatif1 marked this conversation as resolved.
Show resolved
Hide resolved
waleedlatif1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| const [result] = await db | ||
| .select({ | ||
| id: workspace.id, | ||
| inboxEnabled: workspace.inboxEnabled, | ||
| inboxAddress: workspace.inboxAddress, | ||
| inboxProviderId: workspace.inboxProviderId, | ||
| webhookSecret: mothershipInboxWebhook.secret, | ||
| }) | ||
| .from(workspace) | ||
| .leftJoin(mothershipInboxWebhook, eq(mothershipInboxWebhook.workspaceId, workspace.id)) | ||
| .where(eq(workspace.inboxProviderId, inboxId)) | ||
| .limit(1) | ||
|
|
||
| if (!result || !result.webhookSecret) { | ||
| if (!result) { | ||
| logger.warn('No workspace found for inbox', { inboxId }) | ||
| } else { | ||
| logger.warn('No webhook secret found for workspace', { workspaceId: result.id }) | ||
| } | ||
| return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) | ||
| } | ||
|
|
||
| try { | ||
| const wh = new Webhook(result.webhookSecret) | ||
| wh.verify(rawBody, { | ||
| 'svix-id': svixId || '', | ||
| 'svix-timestamp': svixTimestamp || '', | ||
| 'svix-signature': svixSignature || '', | ||
| }) | ||
| } catch (verifyErr) { | ||
| logger.warn('Webhook signature verification failed', { | ||
| workspaceId: result.id, | ||
| error: verifyErr instanceof Error ? verifyErr.message : 'Unknown error', | ||
| }) | ||
| return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) | ||
| } | ||
waleedlatif1 marked this conversation as resolved.
Show resolved
Hide resolved
waleedlatif1 marked this conversation as resolved.
Show resolved
Hide resolved
waleedlatif1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| if (!result.inboxEnabled) { | ||
| logger.info('Inbox disabled, rejecting', { workspaceId: result.id }) | ||
| return NextResponse.json({ ok: true }) | ||
| } | ||
|
|
||
| const fromEmail = extractSenderEmail(message.from_) || '' | ||
| logger.info('Webhook received', { fromEmail, from_raw: message.from_, workspaceId: result.id }) | ||
|
|
||
| if (result.inboxAddress && fromEmail === result.inboxAddress.toLowerCase()) { | ||
| logger.info('Skipping email from inbox itself', { workspaceId: result.id }) | ||
| return NextResponse.json({ ok: true }) | ||
| } | ||
|
|
||
| if (AUTOMATED_SENDERS.some((prefix) => fromEmail.startsWith(prefix))) { | ||
| await createRejectedTask(result.id, message, 'automated_sender') | ||
| return NextResponse.json({ ok: true }) | ||
| } | ||
|
|
||
| const emailMessageId = message.message_id | ||
| const inReplyTo = message.in_reply_to || null | ||
|
|
||
| const [existingResult, isAllowed, recentCount, parentTaskResult] = await Promise.all([ | ||
| emailMessageId | ||
| ? db | ||
| .select({ id: mothershipInboxTask.id }) | ||
| .from(mothershipInboxTask) | ||
| .where(eq(mothershipInboxTask.emailMessageId, emailMessageId)) | ||
| .limit(1) | ||
| : Promise.resolve([]), | ||
| isSenderAllowed(fromEmail, result.id), | ||
| getRecentTaskCount(result.id), | ||
| inReplyTo | ||
| ? db | ||
| .select({ chatId: mothershipInboxTask.chatId }) | ||
| .from(mothershipInboxTask) | ||
| .where(eq(mothershipInboxTask.responseMessageId, inReplyTo)) | ||
| .limit(1) | ||
| : Promise.resolve([]), | ||
| ]) | ||
waleedlatif1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| if (existingResult[0]) { | ||
| logger.info('Duplicate webhook, skipping', { emailMessageId }) | ||
| return NextResponse.json({ ok: true }) | ||
| } | ||
|
|
||
| if (!isAllowed) { | ||
| await createRejectedTask(result.id, message, 'sender_not_allowed') | ||
| return NextResponse.json({ ok: true }) | ||
| } | ||
|
|
||
| if (recentCount >= MAX_EMAILS_PER_HOUR) { | ||
| await createRejectedTask(result.id, message, 'rate_limit_exceeded') | ||
| return NextResponse.json({ ok: true }) | ||
| } | ||
|
|
||
| const chatId = parentTaskResult[0]?.chatId ?? null | ||
|
|
||
| const fromName = extractDisplayName(message.from_) | ||
|
|
||
| const taskId = uuidv4() | ||
waleedlatif1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| const bodyText = message.text?.substring(0, 50_000) || null | ||
| const bodyHtml = message.html?.substring(0, 50_000) || null | ||
| const bodyPreview = (bodyText || '')?.substring(0, 200) || null | ||
|
|
||
| await db.insert(mothershipInboxTask).values({ | ||
| id: taskId, | ||
| workspaceId: result.id, | ||
| fromEmail, | ||
| fromName, | ||
| subject: message.subject || '(no subject)', | ||
| bodyPreview, | ||
| bodyText, | ||
| bodyHtml, | ||
| emailMessageId, | ||
| inReplyTo, | ||
| agentmailMessageId: message.message_id, | ||
| status: 'received', | ||
| chatId, | ||
| hasAttachments: (message.attachments?.length ?? 0) > 0, | ||
| ccRecipients: message.cc?.length ? JSON.stringify(message.cc) : null, | ||
| }) | ||
|
|
||
| if (isTriggerDevEnabled) { | ||
| try { | ||
| const handle = await tasks.trigger('mothership-inbox-execution', { taskId }) | ||
| await db | ||
| .update(mothershipInboxTask) | ||
waleedlatif1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| .set({ triggerJobId: handle.id }) | ||
| .where(eq(mothershipInboxTask.id, taskId)) | ||
| } catch (triggerError) { | ||
| logger.warn('Trigger.dev dispatch failed, falling back to local execution', { | ||
| taskId, | ||
| triggerError, | ||
| }) | ||
| executeInboxTask(taskId).catch((err) => { | ||
| logger.error('Local inbox task execution failed', { | ||
| taskId, | ||
| error: err instanceof Error ? err.message : 'Unknown error', | ||
| }) | ||
| }) | ||
| } | ||
| } else { | ||
| logger.info('Trigger.dev not available, executing inbox task locally', { taskId }) | ||
| executeInboxTask(taskId).catch((err) => { | ||
| logger.error('Local inbox task execution failed', { | ||
| taskId, | ||
| error: err instanceof Error ? err.message : 'Unknown error', | ||
| }) | ||
| }) | ||
| } | ||
|
|
||
| return NextResponse.json({ ok: true }) | ||
| } catch (error) { | ||
| logger.error('AgentMail webhook error', { | ||
| error: error instanceof Error ? error.message : 'Unknown error', | ||
| }) | ||
| return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) | ||
| } | ||
| } | ||
|
|
||
| async function isSenderAllowed(email: string, workspaceId: string): Promise<boolean> { | ||
| const [allowedSenderResult, memberResult] = await Promise.all([ | ||
| db | ||
| .select({ id: mothershipInboxAllowedSender.id }) | ||
| .from(mothershipInboxAllowedSender) | ||
| .where( | ||
| and( | ||
| eq(mothershipInboxAllowedSender.workspaceId, workspaceId), | ||
| eq(mothershipInboxAllowedSender.email, email) | ||
| ) | ||
| ) | ||
| .limit(1), | ||
| db | ||
| .select({ userId: permissions.userId }) | ||
| .from(permissions) | ||
| .innerJoin(user, eq(permissions.userId, user.id)) | ||
| .where( | ||
| and( | ||
| eq(permissions.entityType, 'workspace'), | ||
| eq(permissions.entityId, workspaceId), | ||
| sql`lower(${user.email}) = ${email}` | ||
| ) | ||
| ) | ||
| .limit(1), | ||
| ]) | ||
|
|
||
| return !!(allowedSenderResult[0] || memberResult[0]) | ||
| } | ||
|
|
||
| async function getRecentTaskCount(workspaceId: string): Promise<number> { | ||
| const oneHourAgo = new Date(Date.now() - 60 * 60 * 1000) | ||
| const [result] = await db | ||
| .select({ count: sql<number>`count(*)::int` }) | ||
waleedlatif1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| .from(mothershipInboxTask) | ||
| .where( | ||
| and( | ||
| eq(mothershipInboxTask.workspaceId, workspaceId), | ||
| gt(mothershipInboxTask.createdAt, oneHourAgo), | ||
| ne(mothershipInboxTask.status, 'rejected') | ||
| ) | ||
| ) | ||
| return result?.count ?? 0 | ||
| } | ||
|
|
||
| async function createRejectedTask( | ||
| workspaceId: string, | ||
| message: AgentMailWebhookPayload['message'], | ||
| reason: RejectionReason | ||
| ): Promise<void> { | ||
| await db.insert(mothershipInboxTask).values({ | ||
| id: uuidv4(), | ||
| workspaceId, | ||
| fromEmail: extractSenderEmail(message.from_) || 'unknown', | ||
| fromName: extractDisplayName(message.from_), | ||
| subject: message.subject || '(no subject)', | ||
| bodyPreview: (message.text || '').substring(0, 200) || null, | ||
| emailMessageId: message.message_id, | ||
| agentmailMessageId: message.message_id, | ||
| status: 'rejected', | ||
| rejectionReason: reason, | ||
| hasAttachments: (message.attachments?.length ?? 0) > 0, | ||
| }) | ||
| } | ||
|
|
||
| /** | ||
| * Extract the raw email address from AgentMail's from_ field. | ||
| * Format: "username@domain.com" or "Display Name <username@domain.com>" | ||
| */ | ||
| function extractSenderEmail(from: string): string { | ||
| const match = from.match(/<([^>]+)>/) | ||
| return (match?.[1] || from).toLowerCase().trim() | ||
| } | ||
|
|
||
| function extractDisplayName(from: string): string | null { | ||
| const match = from.match(/^(.+?)\s*</) | ||
| return match?.[1]?.trim().replace(/^"|"$/g, '') || null | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.