Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/a2a/server/events/event_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
import logging
import sys

from types import TracebackType

from typing_extensions import Self

from a2a.types.a2a_pb2 import (
Message,
Task,
Expand Down Expand Up @@ -43,6 +47,19 @@ def __init__(self, max_queue_size: int = DEFAULT_MAX_QUEUE_SIZE) -> None:
self._lock = asyncio.Lock()
logger.debug('EventQueue initialized.')

async def __aenter__(self) -> Self:
"""Enters the async context manager, returning the queue itself."""
return self

async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""Exits the async context manager, ensuring close() is called."""
await self.close()

async def enqueue_event(self, event: Event) -> None:
"""Enqueues an event to this queue and all its children.

Expand Down
22 changes: 22 additions & 0 deletions tests/server/events/test_event_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,28 @@ def test_constructor_invalid_max_queue_size() -> None:
EventQueue(max_queue_size=-10)


@pytest.mark.asyncio
async def test_event_queue_async_context_manager(
event_queue: EventQueue,
) -> None:
"""Test that EventQueue can be used as an async context manager."""
async with event_queue as q:
assert q is event_queue
assert event_queue.is_closed() is False
assert event_queue.is_closed() is True


@pytest.mark.asyncio
async def test_event_queue_async_context_manager_on_exception(
event_queue: EventQueue,
) -> None:
"""Test that close() is called even when an exception occurs inside the context."""
with pytest.raises(RuntimeError, match='boom'):
async with event_queue:
raise RuntimeError('boom')
assert event_queue.is_closed() is True


@pytest.mark.asyncio
async def test_enqueue_and_dequeue_event(event_queue: EventQueue) -> None:
"""Test that an event can be enqueued and dequeued."""
Expand Down
Loading