fix: replace asyncio.wait_for with asyncio.timeout to preserve OTEL context in multiagent#1974
Open
giulio-leone wants to merge 1 commit intostrands-agents:mainfrom
Open
Conversation
…ontext
asyncio.wait_for() wraps coroutines in new asyncio Tasks via
ensure_future(), which copies the contextvars.Context. When an async
generator with an active OpenTelemetry use_span context manager is
iterated through wait_for(), the span token is created in the original
context but context.detach() runs in the copied context, raising:
ValueError: <Token var=...> was created in a different Context
Replace wait_for() with asyncio.timeout() (Python 3.11+) which cancels
the current task instead of creating a new one, preserving context
identity. Add a Python 3.10 fallback that uses inter-event deadline
checks to avoid creating new tasks.
Applied to both Swarm._stream_with_timeout() and
Graph._stream_node_to_queue() which had the same pattern.
Closes strands-agents#1316
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
When using Swarm or Graph multiagent patterns with OpenTelemetry tracing enabled,
context.detach(token)fails with:This occurs during concurrent/streaming agent execution when the OTEL
use_spancontext manager's__exit__tries to detach a token that was created in a differentcontextvars.Context.Root Cause
Swarm._stream_with_timeout()(line 453) usesasyncio.wait_for()to wrap eachasync_generator.__anext__()call with a timeout:asyncio.wait_for()internally callsensure_future()on the coroutine, which wraps it in a newasyncio.Task. Each new task receives a copy of the currentcontextvars.Context. When an async generator has an activetrace_api.use_span()context manager:use_span.__enter__()callscontext.attach(ctx)→ returns a token in Context Await_for()resumes the generator in a new task with Context B (a copy of A)use_span.__exit__()callscontext.detach(token)in Context BThe same pattern existed in
Graph._stream_node_to_queue()which wrappedstream_node()inasyncio.wait_for().Fix
Replace
asyncio.wait_for()withasyncio.timeout()(Python 3.11+), which uses task cancellation instead of creating new tasks, preservingcontextvars.Contextidentity.For Python 3.10 compatibility, a fallback uses inter-event deadline checks that iterate the async generator directly without wrapping it in a new task.
Changes
swarm.py: Replacewait_for(__anext__(), timeout=remaining)loop withasyncio.timeout()context manager (3.11+) / deadline check (3.10)graph.py: Replacewait_for(stream_node(), timeout=self.node_timeout)with the same patterntest_swarm.py: Addtest_swarm_timeout_preserves_otel_contextthat verifiesContextVar.reset(token)succeeds during timeout-wrapped streamingTesting
Closes #1316