From dce25cf65565e0001afba6ef7a4c3c8151cae7a9 Mon Sep 17 00:00:00 2001 From: Carlos Chinchilla Corbacho <188046461+cchinchilla-dev@users.noreply.github.com> Date: Thu, 26 Feb 2026 19:18:24 +0100 Subject: [PATCH 1/2] feat(server): add async context manager support to EventQueue # Conflicts: # src/a2a/server/events/event_queue.py --- src/a2a/server/events/event_queue.py | 19 ++++++++++++++++++- tests/server/events/test_event_queue.py | 21 +++++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/src/a2a/server/events/event_queue.py b/src/a2a/server/events/event_queue.py index d216d7eb2..dd483839f 100644 --- a/src/a2a/server/events/event_queue.py +++ b/src/a2a/server/events/event_queue.py @@ -2,7 +2,11 @@ import logging import sys -from a2a.types.a2a_pb2 import ( +from types import TracebackType + +from typing_extensions import Self + +from a2a.types import ( Message, Task, TaskArtifactUpdateEvent, @@ -43,6 +47,19 @@ def __init__(self, max_queue_size: int = DEFAULT_MAX_QUEUE_SIZE) -> None: self._lock = asyncio.Lock() logger.debug('EventQueue initialized.') + async def __aenter__(self) -> Self: + """Enters the async context manager, returning the queue itself.""" + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + """Exits the async context manager, ensuring close() is called.""" + await self.close() + async def enqueue_event(self, event: Event) -> None: """Enqueues an event to this queue and all its children. diff --git a/tests/server/events/test_event_queue.py b/tests/server/events/test_event_queue.py index 686a90b3c..eb0bae110 100644 --- a/tests/server/events/test_event_queue.py +++ b/tests/server/events/test_event_queue.py @@ -77,6 +77,27 @@ def test_constructor_invalid_max_queue_size() -> None: ): EventQueue(max_queue_size=-10) +@pytest.mark.asyncio +async def test_event_queue_async_context_manager( + event_queue: EventQueue, +) -> None: + """Test that EventQueue can be used as an async context manager.""" + async with event_queue as q: + assert q is event_queue + assert event_queue.is_closed() is False + assert event_queue.is_closed() is True + + +@pytest.mark.asyncio +async def test_event_queue_async_context_manager_on_exception( + event_queue: EventQueue, +) -> None: + """Test that close() is called even when an exception occurs inside the context.""" + with pytest.raises(RuntimeError, match='boom'): + async with event_queue: + raise RuntimeError('boom') + assert event_queue.is_closed() is True + @pytest.mark.asyncio async def test_enqueue_and_dequeue_event(event_queue: EventQueue) -> None: From 526ee0bc7ad7a03141106f0cf3767a8baef78dbd Mon Sep 17 00:00:00 2001 From: Carlos Chinchilla Corbacho <188046461+cchinchilla-dev@users.noreply.github.com> Date: Thu, 26 Feb 2026 19:22:25 +0100 Subject: [PATCH 2/2] feat(server): add async context manager support to EventQueue (format) --- src/a2a/server/events/event_queue.py | 2 +- tests/server/events/test_event_queue.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/a2a/server/events/event_queue.py b/src/a2a/server/events/event_queue.py index dd483839f..d0099f4b2 100644 --- a/src/a2a/server/events/event_queue.py +++ b/src/a2a/server/events/event_queue.py @@ -6,7 +6,7 @@ from typing_extensions import Self -from a2a.types import ( +from a2a.types.a2a_pb2 import ( Message, Task, TaskArtifactUpdateEvent, diff --git a/tests/server/events/test_event_queue.py b/tests/server/events/test_event_queue.py index eb0bae110..2f1dc064b 100644 --- a/tests/server/events/test_event_queue.py +++ b/tests/server/events/test_event_queue.py @@ -77,6 +77,7 @@ def test_constructor_invalid_max_queue_size() -> None: ): EventQueue(max_queue_size=-10) + @pytest.mark.asyncio async def test_event_queue_async_context_manager( event_queue: EventQueue,