feat: add DDS driver using Eclipse CycloneDDS#393
feat: add DDS driver using Eclipse CycloneDDS#393vtz wants to merge 4 commits intojumpstarter-dev:mainfrom
Conversation
❌ Deploy Preview for jumpstarter-docs failed. Why did it fail? →
|
📝 WalkthroughWalkthroughAdds a new Changes
Sequence DiagramsequenceDiagram
participant User as User
participant Client as DdsClient
participant Driver as Dds / MockDds
participant Backend as DdsBackend / MockDdsBackend
participant CycloneDDS as CycloneDDS
User->>Client: connect()
Client->>Driver: connect()
Driver->>Backend: connect()
Backend->>CycloneDDS: (real) create DomainParticipant
CycloneDDS-->>Backend: participant
Backend-->>Driver: DdsParticipantInfo
Driver-->>Client: DdsParticipantInfo
User->>Client: create_topic(name, fields, qos)
Client->>Driver: create_topic(...)
Driver->>Backend: create_topic(...)
Backend->>CycloneDDS: (real) generate IdlStruct + create Topic/Writer/Reader
CycloneDDS-->>Backend: topic objects
Backend-->>Driver: DdsTopicInfo
Driver-->>Client: DdsTopicInfo
User->>Client: publish(topic_name, data)
Client->>Driver: publish(...)
Driver->>Backend: publish(...)
Backend->>CycloneDDS: (real) write(sample)
CycloneDDS-->>Backend: success
Backend-->>Driver: DdsPublishResult
Driver-->>Client: DdsPublishResult
User->>Client: read(topic_name)
Client->>Driver: read(...)
Driver->>Backend: read(...)
Backend-->>Driver: DdsReadResult
Driver-->>Client: DdsReadResult
User->>Client: disconnect()
Client->>Driver: disconnect()
Driver->>Backend: disconnect()
Backend->>CycloneDDS: cleanup (real)
CycloneDDS-->>Backend: cleanup complete
Backend-->>Driver: success
Driver-->>Client: success
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 12
🧹 Nitpick comments (8)
python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py (3)
396-412:monitor()accesses backend internals directly.The method accesses
self._backend._require_connected()andself._backend._topicsdirectly. These are internal details that should be exposed through public methods for consistency.♻️ Suggested improvement
Add a public method to backends:
def has_topic(self, name: str) -> bool: self._require_connected() return name in self._topicsThen:
- self._backend._require_connected() - if topic_name not in self._backend._topics: + if not self._backend.has_topic(topic_name): raise ValueError(f"Topic '{topic_name}' not registered")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py` around lines 396 - 412, monitor currently accesses backend internals (_require_connected and _topics) directly; add a public backend method has_topic(name: str) -> bool that calls the backend's internal _require_connected() and returns name in self._topics, then update monitor to call backend.has_topic(topic_name) instead of inspecting _topics and remove the direct call to _require_connected; keep using the existing public backend.read(...) to fetch samples. Ensure the new has_topic is part of the backend API so monitor only uses public methods (has_topic and read).
52-65: Dynamic IDL type creation assumes all fields are strings.The
_make_idl_typefunction creates dataclass fields all typed asstrwith default"". This limits the driver to string-only topic schemas. Consider documenting this limitation or supporting type annotations.def _make_idl_type(topic_name: str, fields: list[str]): - """Dynamically create a CycloneDDS IdlStruct type for the given fields. + """Dynamically create a CycloneDDS IdlStruct type for the given fields. - Each field name maps to a ``str`` type. The generated class name is - derived from the topic name to avoid collisions between topics. + Each field name maps to a ``str`` type. For complex types, consider + pre-defining IdlStruct classes. The generated class name is derived + from the topic name to avoid collisions between topics. + + Note: This implementation only supports string fields. For mixed types, + define custom IdlStruct subclasses directly. """🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py` around lines 52 - 65, The function _make_idl_type currently forces every field to be a str with default "", which prevents non-string schemas; change the API to accept fields as either a list of names or a list of (name, type, [default]) tuples and build dc_fields accordingly in _make_idl_type: preserve the existing behavior when items are plain strings (map to str, default ""), but when an item is a tuple use the provided type and default (or omit default if not provided) so dataclass fields have correct type annotations; update callers that pass fields to supply types where needed and add or update a short docstring noting the supported field formats and backward compatibility with the string-only form.
325-331: Direct access to_backend._connectedbreaks encapsulation.The
close()method accesses_backend._connecteddirectly instead of using the backend's public interface. Consider adding anis_connectedproperty or method to the backend.♻️ Proposed fix to improve encapsulation
Add to
DdsBackendandMockDdsBackend:`@property` def is_connected(self) -> bool: return self._connectedThen update
Dds.close():def close(self): - if self._backend._connected: + if self._backend.is_connected: try: self._backend.disconnect() except Exception: logger.warning("Failed to disconnect DDS backend", exc_info=True) super().close()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py` around lines 325 - 331, The close() method currently breaks encapsulation by reading _backend._connected directly; add a public read-only property is_connected to the backend implementations (e.g., DdsBackend and MockDdsBackend) that returns their internal _connected flag, then update Dds.close (the close method shown) to call self._backend.is_connected instead of accessing _backend._connected and retain the existing try/except around self._backend.disconnect() and the subsequent super().close() call.python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/client.py (1)
177-188: Method name collision: CLImonitorfunction shadows the class method.The inner function
monitoron line 179 has the same name as the class methodself.monitor()on line 131. While this works becauseself.monitor()is accessed through the closure, it can be confusing during debugging. Consider renaming the CLI function tomonitor_cmdor similar for clarity.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/client.py` around lines 177 - 188, The CLI inner function named monitor shadows the class method self.monitor(), which is confusing; rename the CLI callback (e.g., monitor_cmd) and update the decorator reference (`@base.command`() bound function name) and any uses so the CLI handler is distinct from the class method, keeping the call to self.monitor() unchanged inside the new monitor_cmd function; ensure click.option remains attached to the renamed function and update any tests or references to the old monitor CLI name if present.python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py (3)
216-220: Temporary config file handle remains open while ptp4l runs.The
NamedTemporaryFileis created withdelete=Falsebut the file handle is kept open viaself._config_file. After writing and flushing, the file descriptor remains open. On Windows this could prevent ptp4l from reading it; on Linux (the target), it works but holding the handle isn't necessary.♻️ Suggested improvement to close handle after writing
- self._config_file = tempfile.NamedTemporaryFile( + config_file = tempfile.NamedTemporaryFile( mode="w", suffix=".cfg", prefix="ptp4l_", delete=False ) - self._config_file.write(config_content) - self._config_file.flush() + config_file.write(config_content) + config_file.close() + self._config_file_path = config_file.nameThen update line 232 and 297-303 to use
self._config_file_pathinstead.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py` around lines 216 - 220, The temporary config file handle self._config_file is left open after writing; change the creation to write and flush then close the NamedTemporaryFile and store its path in self._config_file_path (e.g., self._config_file_path = self._config_file.name) and remove/close self._config_file to release the descriptor, then update any code that referenced self._config_file (notably the code paths in start() and the ptp4l invocation around the earlier lines) to use self._config_file_path when passing the filename to ptp4l or other readers.
360-382: Stub implementations return empty/default values.
get_clock_identity()returns an empty string andget_parent_info()returns default values. These appear to be placeholder implementations. Consider adding a TODO comment or raisingNotImplementedErrorif these are intended to be implemented later.Would you like me to help implement these methods by parsing additional ptp4l output or querying the PTP management interface?
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py` around lines 360 - 382, The stub methods get_clock_identity and get_parent_info currently return empty/default values; replace those placeholders by raising NotImplementedError (with a clear message like "get_clock_identity not implemented" / "get_parent_info not implemented") after the existing self._require_started() call and add a TODO comment indicating the intended implementation (e.g., parse ptp4l output or query PTP management interface) so callers and CI know these are intentionally unimplemented; keep the existing decorators (`@export`, `@validate_call`) and method signatures intact.
419-419: Magic number100for streaming iteration limit.Both
Gptp.read()andMockGptp.read()iterate 100 times before stopping. This hardcoded limit could be documented or made configurable.Also applies to: 581-581
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py` at line 419, The hardcoded magic number 100 used in the streaming loops of Gptp.read and MockGptp.read should be replaced with a named, configurable limit: introduce a constant or configurable attribute (e.g., STREAM_ITER_LIMIT or self.max_stream_iterations) and use it instead of the literal 100 in both Gptp.read and MockGptp.read; expose the value via constructor args or config so callers/tests can override, update any places that rely on the default, and add a brief docstring or comment explaining the purpose of the limit.python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver_test.py (1)
576-594: Network namespace cleanup may leave orphaned resources on test failure.The
veth_pairfixture usescheck=Truefor setup butcheck=Falsefor teardown. If setup fails partway through (e.g., after creatingns-ptp-masterbut before creating the veth pair), the teardown won't clean up properly since it only deletes namespaces.♻️ Suggested improvement for robust cleanup
`@pytest.fixture` def veth_pair(self): import subprocess as sp + created_ns = [] cmds = [ "ip netns add ns-ptp-master", "ip netns add ns-ptp-slave", "ip link add veth-m type veth peer name veth-s", "ip link set veth-m netns ns-ptp-master", "ip link set veth-s netns ns-ptp-slave", "ip netns exec ns-ptp-master ip addr add 10.99.0.1/24 dev veth-m", "ip netns exec ns-ptp-slave ip addr add 10.99.0.2/24 dev veth-s", "ip netns exec ns-ptp-master ip link set veth-m up", "ip netns exec ns-ptp-slave ip link set veth-s up", ] - for cmd in cmds: - sp.run(cmd.split(), check=True) - yield ("ns-ptp-master", "veth-m", "ns-ptp-slave", "veth-s") - sp.run("ip netns del ns-ptp-master".split(), check=False) - sp.run("ip netns del ns-ptp-slave".split(), check=False) + try: + for cmd in cmds: + sp.run(cmd.split(), check=True) + if "netns add" in cmd: + ns_name = cmd.split()[-1] + created_ns.append(ns_name) + yield ("ns-ptp-master", "veth-m", "ns-ptp-slave", "veth-s") + finally: + for ns in reversed(created_ns): + sp.run(["ip", "netns", "del", ns], check=False)
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/client.py`:
- Around line 50-55: The current list_topics method swallows malformed responses
by returning [] when raw isn't a list; change list_topics to validate the RPC
result: check that raw is a list and that each element can be parsed with
DdsTopicInfo.model_validate, and if not raise a clear exception (e.g.,
TypeError/ValueError) that includes the unexpected raw payload to surface
protocol/serialization bugs; keep the existing behavior of returning the list of
DdsTopicInfo instances when validation succeeds.
- Around line 74-85: The CLI helper _register_lifecycle_commands is registering
functions named connect_cmd and disconnect_cmd which produce unwanted CLI names
like connect-cmd; update the base.command() calls inside
_register_lifecycle_commands to pass explicit name arguments (e.g.
base.command(name="connect") for the connect_cmd function and
base.command(name="disconnect") for disconnect_cmd) so the CLI commands are
"connect" and "disconnect"; do the same pattern for other command functions that
end with _cmd (e.g. read_cmd → name="read", monitor_cmd → name="monitor") to
ensure correct kebab-case names without the trailing "-cmd".
In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/conftest.py`:
- Around line 69-82: In publish(self, topic_name, data) add a check that all
required fields declared in self._topic_fields[topic_name] are present in the
incoming data (not just rejecting unexpected keys): compute the set difference
between declared fields and data.keys(), and if any are missing raise
DdsTopicError with a message listing the missing fields and valid fields; keep
the existing unknown-field validation and the rest of the logic
(super().publish, _total_published, _call_log) unchanged.
In `@python/packages/jumpstarter-driver-dds/pyproject.toml`:
- Around line 23-25: The source_archive URL under
[tool.hatch.metadata.hooks.vcs.urls] in pyproject.toml uses the wrong repo slug;
update the source_archive value to point to the actual repository (replace
"https://github.com/jumpstarter-dev/repo/archive/{commit_hash}.zip" with
"https://github.com/jumpstarter-dev/jumpstarter/archive/{commit_hash}.zip") so
the generated source archive link is valid.
- Around line 11-14: The package manifest is missing the direct runtime
dependency on click even though jumpstarter_driver_dds/client.py imports and
uses click (e.g., command decorators and output calls); update
python/packages/jumpstarter-driver-dds/pyproject.toml by adding click (e.g.,
"click>=8.0" or your project's minimum supported click) to the dependencies list
so the package declares its own required runtime dependency.
In `@python/packages/jumpstarter-driver-dds/README.md`:
- Around line 101-105: The example uses the streaming API
dds.monitor("sensor/temperature") which never returns, so dds.disconnect() is
unreachable; update the README snippet to either show a bounded read (e.g.,
iterate a fixed number of samples) or wrap the monitor loop in try/finally to
ensure dds.disconnect() is called, or explicitly state that the example is
long-lived and must be interrupted manually; refer to the monitor() call and the
dds.disconnect() call when making the change.
In `@python/packages/jumpstarter-driver-gptp/examples/exporter.yaml`:
- Around line 4-13: The top-level key "drivers:" is deprecated and should be
renamed to "export:" to match the ExporterConfig schema; update the YAML so the
block beginning with "drivers:" is replaced with "export:" (keeping the nested
"gptp" entry, type jumpstarter_driver_gptp.driver.Gptp, and the existing config
fields like interface, domain, profile, transport, role, and sync_system_clock
unchanged) so the example validates against ExporterConfig.
In `@python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/client.py`:
- Around line 113-129: The wait_for_sync loop currently swallows all exceptions;
update it to catch only expected transient errors (e.g., ConnectionError,
TimeoutError, OSError) when calling self.is_synchronized(), log the caught
exception at debug or warning level (use self.logger if available or the logging
module), and re-raise any other unexpected exceptions so critical failures
aren't hidden; specifically replace the bare `except Exception: pass` in
wait_for_sync with a narrowed `except (ConnectionError, TimeoutError, OSError)
as e:` that logs and continues, and an `except Exception:` that re-raises.
In `@python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/conftest.py`:
- Around line 53-58: The override of stop() (MockGptpBackend.stop) clears
several states but does not reset the base backend's _offset_ns back to 0.0,
which changes post-stop semantics; fix by preserving the base behavior — either
call super().stop() at the start or explicitly set self._offset_ns = 0.0 in
MockGptpBackend.stop() (and keep the existing resets of _started, _port_state,
_servo_state and _call_log) so the last synced offset is not leaked into later
assertions.
In
`@python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver_test.py`:
- Around line 611-626: The test test_gptp_real_sync_software_timestamping
instantiates Gptp(interface=slave_iface, ...) in the default namespace while the
slave interface was moved into ns-ptp-slave; update the test so the driver and
its ptp4l child run inside the slave namespace by either: invoking serve(driver)
under the namespace (e.g., run the serve/Gptp process with ip netns exec
ns-ptp-slave) or by using the same pattern as the ptp_master fixture to wrap the
driver startup in the slave namespace; ensure you reference the existing
veth_pair variables (slave_ns, slave_iface), the Gptp constructor, and the
serve(client) call so the spawned ptp4l can find slave_iface.
In `@python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py`:
- Around line 249-251: Replace the call to
asyncio.get_event_loop().create_task(...) with the idiomatic
asyncio.create_task(...) when scheduling the reader coroutine: change the
creation of self._reader_task (currently assigned via
asyncio.get_event_loop().create_task(self._read_ptp4l_output())) to use
asyncio.create_task(self._read_ptp4l_output()) so the _read_ptp4l_output
coroutine is scheduled directly and more idiomatically.
In `@python/packages/jumpstarter-driver-gptp/pyproject.toml`:
- Around line 25-27: The package metadata's source_archive URL is using the
wrong repository slug; update the [tool.hatch.metadata.hooks.vcs.urls] entry
named source_archive to point to the correct GitHub repo for the gPTP package
(replace "jumpstarter-dev/repo" with the actual repository slug for this
project) so the published metadata links to a valid archive; edit the
source_archive value in pyproject.toml to use the correct repo path and keep the
{commit_hash}.zip placeholder.
---
Nitpick comments:
In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py`:
- Around line 396-412: monitor currently accesses backend internals
(_require_connected and _topics) directly; add a public backend method
has_topic(name: str) -> bool that calls the backend's internal
_require_connected() and returns name in self._topics, then update monitor to
call backend.has_topic(topic_name) instead of inspecting _topics and remove the
direct call to _require_connected; keep using the existing public
backend.read(...) to fetch samples. Ensure the new has_topic is part of the
backend API so monitor only uses public methods (has_topic and read).
- Around line 52-65: The function _make_idl_type currently forces every field to
be a str with default "", which prevents non-string schemas; change the API to
accept fields as either a list of names or a list of (name, type, [default])
tuples and build dc_fields accordingly in _make_idl_type: preserve the existing
behavior when items are plain strings (map to str, default ""), but when an item
is a tuple use the provided type and default (or omit default if not provided)
so dataclass fields have correct type annotations; update callers that pass
fields to supply types where needed and add or update a short docstring noting
the supported field formats and backward compatibility with the string-only
form.
- Around line 325-331: The close() method currently breaks encapsulation by
reading _backend._connected directly; add a public read-only property
is_connected to the backend implementations (e.g., DdsBackend and
MockDdsBackend) that returns their internal _connected flag, then update
Dds.close (the close method shown) to call self._backend.is_connected instead of
accessing _backend._connected and retain the existing try/except around
self._backend.disconnect() and the subsequent super().close() call.
In `@python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/client.py`:
- Around line 177-188: The CLI inner function named monitor shadows the class
method self.monitor(), which is confusing; rename the CLI callback (e.g.,
monitor_cmd) and update the decorator reference (`@base.command`() bound function
name) and any uses so the CLI handler is distinct from the class method, keeping
the call to self.monitor() unchanged inside the new monitor_cmd function; ensure
click.option remains attached to the renamed function and update any tests or
references to the old monitor CLI name if present.
In `@python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py`:
- Around line 216-220: The temporary config file handle self._config_file is
left open after writing; change the creation to write and flush then close the
NamedTemporaryFile and store its path in self._config_file_path (e.g.,
self._config_file_path = self._config_file.name) and remove/close
self._config_file to release the descriptor, then update any code that
referenced self._config_file (notably the code paths in start() and the ptp4l
invocation around the earlier lines) to use self._config_file_path when passing
the filename to ptp4l or other readers.
- Around line 360-382: The stub methods get_clock_identity and get_parent_info
currently return empty/default values; replace those placeholders by raising
NotImplementedError (with a clear message like "get_clock_identity not
implemented" / "get_parent_info not implemented") after the existing
self._require_started() call and add a TODO comment indicating the intended
implementation (e.g., parse ptp4l output or query PTP management interface) so
callers and CI know these are intentionally unimplemented; keep the existing
decorators (`@export`, `@validate_call`) and method signatures intact.
- Line 419: The hardcoded magic number 100 used in the streaming loops of
Gptp.read and MockGptp.read should be replaced with a named, configurable limit:
introduce a constant or configurable attribute (e.g., STREAM_ITER_LIMIT or
self.max_stream_iterations) and use it instead of the literal 100 in both
Gptp.read and MockGptp.read; expose the value via constructor args or config so
callers/tests can override, update any places that rely on the default, and add
a brief docstring or comment explaining the purpose of the limit.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 1bd84cc6-4bca-4158-9c59-7c50d844f7bf
⛔ Files ignored due to path filters (1)
python/uv.lockis excluded by!**/*.lock
📒 Files selected for processing (24)
python/docs/source/reference/package-apis/drivers/dds.mdpython/docs/source/reference/package-apis/drivers/gptp.mdpython/docs/source/reference/package-apis/drivers/index.mdpython/packages/jumpstarter-driver-dds/.gitignorepython/packages/jumpstarter-driver-dds/README.mdpython/packages/jumpstarter-driver-dds/examples/exporter.yamlpython/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/__init__.pypython/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/client.pypython/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/common.pypython/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/conftest.pypython/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.pypython/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver_test.pypython/packages/jumpstarter-driver-dds/pyproject.tomlpython/packages/jumpstarter-driver-gptp/.gitignorepython/packages/jumpstarter-driver-gptp/README.mdpython/packages/jumpstarter-driver-gptp/examples/exporter.yamlpython/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/__init__.pypython/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/client.pypython/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/common.pypython/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/conftest.pypython/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.pypython/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver_test.pypython/packages/jumpstarter-driver-gptp/pyproject.tomlpython/pyproject.toml
python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/client.py
Outdated
Show resolved
Hide resolved
python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/conftest.py
Show resolved
Hide resolved
| dependencies = [ | ||
| "jumpstarter", | ||
| "cyclonedds>=0.10.0", | ||
| ] |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# First, find the DDS driver package location
fd -t f "pyproject.toml" | grep "jumpstarter-driver-dds"Repository: jumpstarter-dev/jumpstarter
Length of output: 124
🏁 Script executed:
#!/bin/bash
# Read the client.py file to check if click is imported
if [ -f "python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/client.py" ]; then
cat -n "python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/client.py"
fiRepository: jumpstarter-dev/jumpstarter
Length of output: 5990
🏁 Script executed:
#!/bin/bash
# Read the full pyproject.toml for DDS driver
if [ -f "python/packages/jumpstarter-driver-dds/pyproject.toml" ]; then
cat -n "python/packages/jumpstarter-driver-dds/pyproject.toml"
fiRepository: jumpstarter-dev/jumpstarter
Length of output: 1499
🏁 Script executed:
#!/bin/bash
# Check if click is available from transitive dependencies (check jumpstarter package)
if [ -f "python/packages/jumpstarter/pyproject.toml" ]; then
echo "=== jumpstarter package dependencies ==="
cat -n "python/packages/jumpstarter/pyproject.toml" | grep -A 20 "dependencies"
fiRepository: jumpstarter-dev/jumpstarter
Length of output: 754
🏁 Script executed:
#!/bin/bash
# Check pyserial driver for reference (mentioned in learnings as using transitive deps)
if [ -f "python/packages/jumpstarter-driver-pyserial/pyproject.toml" ]; then
echo "=== pyserial package dependencies ==="
cat -n "python/packages/jumpstarter-driver-pyserial/pyproject.toml" | grep -A 20 "dependencies"
fiRepository: jumpstarter-dev/jumpstarter
Length of output: 711
Declare click as a direct runtime dependency.
python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/client.py imports click directly (line 7) and uses it extensively in command decorators and output calls, but this package doesn't declare it in dependencies. The base jumpstarter package also doesn't provide it transitively. This violates the requirement that packages declare their own minimal, focused dependencies.
📦 Proposed manifest fix
dependencies = [
"jumpstarter",
+ "click",
"cyclonedds>=0.10.0",
]📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| dependencies = [ | |
| "jumpstarter", | |
| "cyclonedds>=0.10.0", | |
| ] | |
| dependencies = [ | |
| "jumpstarter", | |
| "click", | |
| "cyclonedds>=0.10.0", | |
| ] |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@python/packages/jumpstarter-driver-dds/pyproject.toml` around lines 11 - 14,
The package manifest is missing the direct runtime dependency on click even
though jumpstarter_driver_dds/client.py imports and uses click (e.g., command
decorators and output calls); update
python/packages/jumpstarter-driver-dds/pyproject.toml by adding click (e.g.,
"click>=8.0" or your project's minimum supported click) to the dependencies list
so the package declares its own required runtime dependency.
| def wait_for_sync(self, timeout: float = 30.0, poll_interval: float = 1.0) -> bool: | ||
| """Block until PTP synchronization is achieved or timeout expires. | ||
|
|
||
| :param timeout: Maximum time to wait in seconds | ||
| :param poll_interval: Polling interval in seconds | ||
| :returns: True if synchronized, False if timeout expired | ||
| :rtype: bool | ||
| """ | ||
| deadline = time.monotonic() + timeout | ||
| while time.monotonic() < deadline: | ||
| try: | ||
| if self.is_synchronized(): | ||
| return True | ||
| except Exception: | ||
| pass | ||
| time.sleep(poll_interval) | ||
| return False |
There was a problem hiding this comment.
Overly broad exception swallowing in wait_for_sync may hide critical failures.
The bare except Exception: pass silently ignores all errors during polling, including connection failures, authentication issues, or driver crashes. Consider catching only expected transient errors:
🛡️ Proposed fix to narrow exception handling
def wait_for_sync(self, timeout: float = 30.0, poll_interval: float = 1.0) -> bool:
"""Block until PTP synchronization is achieved or timeout expires.
:param timeout: Maximum time to wait in seconds
:param poll_interval: Polling interval in seconds
:returns: True if synchronized, False if timeout expired
:rtype: bool
"""
+ from jumpstarter.client.core import DriverError
+
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
try:
if self.is_synchronized():
return True
- except Exception:
- pass
+ except DriverError:
+ # Expected during startup/transient states
+ pass
time.sleep(poll_interval)
return False📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def wait_for_sync(self, timeout: float = 30.0, poll_interval: float = 1.0) -> bool: | |
| """Block until PTP synchronization is achieved or timeout expires. | |
| :param timeout: Maximum time to wait in seconds | |
| :param poll_interval: Polling interval in seconds | |
| :returns: True if synchronized, False if timeout expired | |
| :rtype: bool | |
| """ | |
| deadline = time.monotonic() + timeout | |
| while time.monotonic() < deadline: | |
| try: | |
| if self.is_synchronized(): | |
| return True | |
| except Exception: | |
| pass | |
| time.sleep(poll_interval) | |
| return False | |
| def wait_for_sync(self, timeout: float = 30.0, poll_interval: float = 1.0) -> bool: | |
| """Block until PTP synchronization is achieved or timeout expires. | |
| :param timeout: Maximum time to wait in seconds | |
| :param poll_interval: Polling interval in seconds | |
| :returns: True if synchronized, False if timeout expired | |
| :rtype: bool | |
| """ | |
| from jumpstarter.client.core import DriverError | |
| deadline = time.monotonic() + timeout | |
| while time.monotonic() < deadline: | |
| try: | |
| if self.is_synchronized(): | |
| return True | |
| except DriverError: | |
| # Expected during startup/transient states | |
| pass | |
| time.sleep(poll_interval) | |
| return False |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/client.py`
around lines 113 - 129, The wait_for_sync loop currently swallows all
exceptions; update it to catch only expected transient errors (e.g.,
ConnectionError, TimeoutError, OSError) when calling self.is_synchronized(), log
the caught exception at debug or warning level (use self.logger if available or
the logging module), and re-raise any other unexpected exceptions so critical
failures aren't hidden; specifically replace the bare `except Exception: pass`
in wait_for_sync with a narrowed `except (ConnectionError, TimeoutError,
OSError) as e:` that logs and continues, and an `except Exception:` that
re-raises.
| def stop(self): | ||
| self.require_started() | ||
| self._started = False | ||
| self._port_state = "INITIALIZING" | ||
| self._servo_state = "s0" | ||
| self._call_log.append("stop") |
There was a problem hiding this comment.
Preserve the base backend's stop-state resets.
MockGptpBackend.stop() clears _offset_ns back to 0.0, but this override leaves the last synced offset behind. That changes the post-stop semantics of the drop-in backend and can leak stale state into later assertions.
♻️ Proposed fix
def stop(self):
- self.require_started()
- self._started = False
- self._port_state = "INITIALIZING"
- self._servo_state = "s0"
+ super().stop()
self._call_log.append("stop")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def stop(self): | |
| self.require_started() | |
| self._started = False | |
| self._port_state = "INITIALIZING" | |
| self._servo_state = "s0" | |
| self._call_log.append("stop") | |
| def stop(self): | |
| super().stop() | |
| self._call_log.append("stop") |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/conftest.py`
around lines 53 - 58, The override of stop() (MockGptpBackend.stop) clears
several states but does not reset the base backend's _offset_ns back to 0.0,
which changes post-stop semantics; fix by preserving the base behavior — either
call super().stop() at the start or explicitly set self._offset_ns = 0.0 in
MockGptpBackend.stop() (and keep the existing resets of _started, _port_state,
_servo_state and _call_log) so the last synced offset is not leaked into later
assertions.
| def test_gptp_real_sync_software_timestamping(self, veth_pair, ptp_master): | ||
| import time | ||
| _, _, slave_ns, slave_iface = veth_pair | ||
| driver = Gptp( | ||
| interface=slave_iface, domain=0, profile="default", | ||
| transport="UDPv4", role="slave", sync_system_clock=False, | ||
| ) | ||
| with serve(driver) as client: | ||
| client.start() | ||
| time.sleep(10) | ||
| status = client.status() | ||
| assert status.port_state == PortState.SLAVE | ||
| offset = client.get_offset() | ||
| assert abs(offset.offset_from_master_ns) < 10_000_000 | ||
| assert client.is_synchronized() is True | ||
| client.stop() |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify whether the test expects to run inside the namespace or if there's additional setup
# Check if there's any namespace execution wrapper in conftest or the driver
rg -n "netns exec|nsenter|setns" python/packages/jumpstarter-driver-gptp/Repository: jumpstarter-dev/jumpstarter
Length of output: 688
🏁 Script executed:
# Get the full test function and surrounding context
sed -n '570,635p' python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver_test.pyRepository: jumpstarter-dev/jumpstarter
Length of output: 2641
🏁 Script executed:
# Find the veth_pair fixture definition
rg -n "def veth_pair|@pytest.fixture.*veth_pair" python/packages/jumpstarter-driver-gptp/ -A 20Repository: jumpstarter-dev/jumpstarter
Length of output: 2760
🏁 Script executed:
# Check the serve function implementation
rg -n "def serve" python/packages/jumpstarter-driver-gptp/ -A 15Repository: jumpstarter-dev/jumpstarter
Length of output: 53
🏁 Script executed:
# Search for serve function definition - it should be in the same test file
rg -n "def serve" python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver_test.py -A 10Repository: jumpstarter-dev/jumpstarter
Length of output: 53
🏁 Script executed:
# Check the Gptp class to see if it supports namespace handling
rg -n "class Gptp|def __init__" python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py -A 20 | head -60Repository: jumpstarter-dev/jumpstarter
Length of output: 2263
🏁 Script executed:
# Check if there's a namespace context manager or parameter in Gptp
rg -n "netns|namespace" python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.pyRepository: jumpstarter-dev/jumpstarter
Length of output: 53
🏁 Script executed:
# Search for serve function in conftest or test files
find python/packages/jumpstarter-driver-gptp -name "conftest.py" -o -name "*.py" | xargs grep -l "def serve" 2>/dev/nullRepository: jumpstarter-dev/jumpstarter
Length of output: 55
🏁 Script executed:
# Check imports at the top of driver_test.py
head -50 python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver_test.pyRepository: jumpstarter-dev/jumpstarter
Length of output: 1743
🏁 Script executed:
# Search for serve in the entire jumpstarter-driver-gptp package
rg -n "from .* import serve|def serve" python/packages/jumpstarter-driver-gptp/Repository: jumpstarter-dev/jumpstarter
Length of output: 413
🏁 Script executed:
# Find and examine the serve function
find python -name "utils.py" -path "*/common/*" | xargs grep -l "def serve" 2>/dev/nullRepository: jumpstarter-dev/jumpstarter
Length of output: 126
🏁 Script executed:
# Get the serve function implementation
find python -name "utils.py" -path "*/common/*" -exec grep -A 30 "def serve" {} + | head -50Repository: jumpstarter-dev/jumpstarter
Length of output: 1811
🏁 Script executed:
# Check Gptp.start() implementation to see how it accesses the interface
rg -n "async def start|def start" python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py -A 20 | head -80Repository: jumpstarter-dev/jumpstarter
Length of output: 2466
🏁 Script executed:
# Check if Gptp spawns subprocess for ptp4l and how it accesses the interface
rg -n "ptp4l|asyncio.subprocess" python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py -B 2 -A 5 | head -100Repository: jumpstarter-dev/jumpstarter
Length of output: 3606
🏁 Script executed:
# Confirm that interfaces moved to namespace are not accessible from default namespace
# by checking Linux network namespace behavior
cat << 'EOF' > /tmp/test_netns.py
# When an interface is moved to a namespace with:
# ip link set veth-s netns ns-ptp-slave
# That interface is NO LONGER visible in the default/host namespace
# ptp4l spawned in the host namespace will fail to find that interface
# This is fundamental Linux network namespace isolation behavior
print("Linux namespace isolation confirmed:")
print("1. veth-s moved to ns-ptp-slave via: ip link set veth-s netns ns-ptp-slave")
print("2. ptp4l spawned in default namespace CANNOT access veth-s")
print("3. ptp4l will fail with 'Cannot find interface' or similar error")
EOF
python /tmp/test_netns.pyRepository: jumpstarter-dev/jumpstarter
Length of output: 306
🏁 Script executed:
# Check if there are any other tests in the file that properly handle namespace execution
rg -n "ip netns exec|nsenter" python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver_test.pyRepository: jumpstarter-dev/jumpstarter
Length of output: 372
Driver must execute inside the slave network namespace.
The test moves veth-s into ns-ptp-slave (line 584: ip link set veth-s netns ns-ptp-slave), making it invisible in the default namespace. However, Gptp(interface=slave_iface, ...) is instantiated without namespace context, causing the spawned ptp4l process to fail finding the interface.
Compare with ptp_master fixture (lines 597-607), which correctly wraps ptp4l with ip netns exec ns-ptp-master. The test should wrap the driver invocation similarly using ip netns exec ns-ptp-slave or pass the namespace to serve() and Gptp.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver_test.py`
around lines 611 - 626, The test test_gptp_real_sync_software_timestamping
instantiates Gptp(interface=slave_iface, ...) in the default namespace while the
slave interface was moved into ns-ptp-slave; update the test so the driver and
its ptp4l child run inside the slave namespace by either: invoking serve(driver)
under the namespace (e.g., run the serve/Gptp process with ip netns exec
ns-ptp-slave) or by using the same pattern as the ptp_master fixture to wrap the
driver startup in the slave namespace; ensure you reference the existing
veth_pair variables (slave_ns, slave_iface), the Gptp constructor, and the
serve(client) call so the spawned ptp4l can find slave_iface.
| self._reader_task = asyncio.get_event_loop().create_task( | ||
| self._read_ptp4l_output() | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
Is asyncio.get_event_loop() deprecated in Python 3.10+ when called from a coroutine?
💡 Result:
No, asyncio.get_event_loop is not deprecated when called from a coroutine in Python 3.10+. When called from a coroutine (or callback), it returns the running event loop without emitting a DeprecationWarning. The deprecation applies specifically to cases where there is no running event loop and no current loop set in the policy (typically synchronous code outside a running loop). The official documentation explicitly states: "When called from a coroutine or a callback (e.g. scheduled with call_soon or similar API), this function will always return the running event loop." The DeprecationWarning is emitted only if there is no running event loop and the policy's get_event_loop is called to create or retrieve one, which does not happen in a coroutine context where a loop is running. Instead, use asyncio.get_running_loop in coroutines as the preferred method, though get_event_loop continues to work without warning in that context.
Citations:
- 1: https://docs.python.org/3.10/library/asyncio-eventloop.html
- 2: https://docs.python.org/3.12/library/asyncio-eventloop.html
- 3: https://docs.python.org/3.11/library/asyncio-eventloop.html
- 4: https://docs.python.org/3.12/library/asyncio-policy.html
- 5: DeprecationWarning scope expanded in asyncio.events python/cpython#100160
🏁 Script executed:
# Check the file to understand the context better
cd python/packages/jumpstarter-driver-gptp && head -n 260 jumpstarter_driver_gptp/driver.py | tail -n 20Repository: jumpstarter-dev/jumpstarter
Length of output: 734
Use asyncio.create_task() directly instead of asyncio.get_event_loop().create_task().
In async methods, you can create tasks directly without explicitly accessing the event loop. This is the idiomatic approach and avoids unnecessary intermediate calls.
🐛 Proposed fix
- self._reader_task = asyncio.get_event_loop().create_task(
+ self._reader_task = asyncio.create_task(
self._read_ptp4l_output()
)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py`
around lines 249 - 251, Replace the call to
asyncio.get_event_loop().create_task(...) with the idiomatic
asyncio.create_task(...) when scheduling the reader coroutine: change the
creation of self._reader_task (currently assigned via
asyncio.get_event_loop().create_task(self._read_ptp4l_output())) to use
asyncio.create_task(self._read_ptp4l_output()) so the _read_ptp4l_output
coroutine is scheduled directly and more idiomatically.
| [tool.hatch.metadata.hooks.vcs.urls] | ||
| Homepage = "https://jumpstarter.dev" | ||
| source_archive = "https://github.com/jumpstarter-dev/repo/archive/{commit_hash}.zip" |
There was a problem hiding this comment.
Fix the source_archive repository slug here too.
This points at jumpstarter-dev/repo, so the published metadata will link to a non-existent archive for the gPTP package as well.
🔗 Proposed metadata fix
-source_archive = "https://github.com/jumpstarter-dev/repo/archive/{commit_hash}.zip"
+source_archive = "https://github.com/jumpstarter-dev/jumpstarter/archive/{commit_hash}.zip"📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| [tool.hatch.metadata.hooks.vcs.urls] | |
| Homepage = "https://jumpstarter.dev" | |
| source_archive = "https://github.com/jumpstarter-dev/repo/archive/{commit_hash}.zip" | |
| [tool.hatch.metadata.hooks.vcs.urls] | |
| Homepage = "https://jumpstarter.dev" | |
| source_archive = "https://github.com/jumpstarter-dev/jumpstarter/archive/{commit_hash}.zip" |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@python/packages/jumpstarter-driver-gptp/pyproject.toml` around lines 25 - 27,
The package metadata's source_archive URL is using the wrong repository slug;
update the [tool.hatch.metadata.hooks.vcs.urls] entry named source_archive to
point to the correct GitHub repo for the gPTP package (replace
"jumpstarter-dev/repo" with the actual repository slug for this project) so the
published metadata links to a valid archive; edit the source_archive value in
pyproject.toml to use the correct repo path and keep the {commit_hash}.zip
placeholder.
Add jumpstarter-driver-dds package providing DDS (Data Distribution Service) pub/sub messaging via Eclipse CycloneDDS. Includes domain participation, topic management with configurable QoS, publish/subscribe, streaming monitor, and a CLI interface. Comprehensive test suite with 58 tests covering: - Pydantic model validation - MockDdsBackend unit tests - Full gRPC e2e tests via MockDds driver - Error path coverage - Stateful lifecycle tests (connection, topic, pub/sub, reconnect) - Call log/audit trail tests - End-to-end workflow scenarios Made-with: Cursor
There was a problem hiding this comment.
🧹 Nitpick comments (2)
python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/conftest.py (1)
40-53: Consider whether_call_logshould be cleared on disconnect.
disconnect()resets_total_publishedand_total_readbut preserves_call_log. If the intent is to reset all session state, this is inconsistent; if the intent is to maintain a persistent audit trail across reconnects, this is correct but should be documented.Looking at
test_stateful_call_log_records_operations, the test expects the full log includingdisconnect, so the current behavior appears intentional for audit purposes.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/conftest.py` around lines 40 - 53, The disconnect() method resets session counters but leaves _call_log intact which is intentional per test_stateful_call_log_records_operations; add a brief clarifying comment inside disconnect (near the _call_log append) stating that _call_log is preserved across disconnect/reconnect to serve as a persistent audit trail so future readers understand this is deliberate (reference disconnect, _call_log, and test_stateful_call_log_records_operations).python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver_test.py (1)
371-379: CLI test provides minimal smoke coverage.The test verifies the CLI interface exists and has expected commands, but only checks
topicsandinfo. Consider expanding to verify all expected CLI commands (connect,disconnect,read,monitor) are present.🔧 Suggested expansion
def test_cli_interface(self): with serve(MockDds()) as client: cli = client.cli() assert hasattr(cli, "commands") - assert "topics" in cli.commands - assert "info" in cli.commands + # Verify all expected DDS CLI commands are registered + expected = {"connect", "disconnect", "topics", "info", "read", "monitor"} + assert expected.issubset(set(cli.commands.keys()))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver_test.py` around lines 371 - 379, The test TestClientCli.test_cli_interface currently only asserts that cli.commands contains "topics" and "info"; expand it to also assert presence of the other expected CLI commands by adding assertions that "connect", "disconnect", "read", and "monitor" are keys in cli.commands (leave the existing checks for "topics" and "info" intact), locating the CLI via serve(MockDds()) and the cli variable used in this test.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/conftest.py`:
- Around line 40-53: The disconnect() method resets session counters but leaves
_call_log intact which is intentional per
test_stateful_call_log_records_operations; add a brief clarifying comment inside
disconnect (near the _call_log append) stating that _call_log is preserved
across disconnect/reconnect to serve as a persistent audit trail so future
readers understand this is deliberate (reference disconnect, _call_log, and
test_stateful_call_log_records_operations).
In
`@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver_test.py`:
- Around line 371-379: The test TestClientCli.test_cli_interface currently only
asserts that cli.commands contains "topics" and "info"; expand it to also assert
presence of the other expected CLI commands by adding assertions that "connect",
"disconnect", "read", and "monitor" are keys in cli.commands (leave the existing
checks for "topics" and "info" intact), locating the CLI via serve(MockDds())
and the cli variable used in this test.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: fb37f15a-e50c-44c6-a488-6d7775a082bc
⛔ Files ignored due to path filters (1)
python/uv.lockis excluded by!**/*.lock
📒 Files selected for processing (13)
python/docs/source/reference/package-apis/drivers/dds.mdpython/docs/source/reference/package-apis/drivers/index.mdpython/packages/jumpstarter-driver-dds/.gitignorepython/packages/jumpstarter-driver-dds/README.mdpython/packages/jumpstarter-driver-dds/examples/exporter.yamlpython/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/__init__.pypython/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/client.pypython/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/common.pypython/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/conftest.pypython/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.pypython/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver_test.pypython/packages/jumpstarter-driver-dds/pyproject.tomlpython/pyproject.toml
✅ Files skipped from review due to trivial changes (7)
- python/docs/source/reference/package-apis/drivers/dds.md
- python/packages/jumpstarter-driver-dds/.gitignore
- python/docs/source/reference/package-apis/drivers/index.md
- python/packages/jumpstarter-driver-dds/examples/exporter.yaml
- python/packages/jumpstarter-driver-dds/pyproject.toml
- python/packages/jumpstarter-driver-dds/README.md
- python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/common.py
🚧 Files skipped from review as they are similar to previous changes (3)
- python/pyproject.toml
- python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/client.py
- python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py
- Raise TypeError on malformed list_topics() response instead of silently returning [] - Add explicit name= to CLI commands to avoid _cmd suffixes - Add is_connected property and has_topic() to backends for encapsulation - Use public backend API in monitor() and close() instead of accessing internals - Validate missing required fields in StatefulDdsBackend.publish() - Document string-only field limitation in _make_idl_type - Document _call_log persistence across disconnect in StatefulDdsBackend - Expand CLI test to verify all registered commands - Add comprehensive docstrings across all modules Made-with: Cursor
There was a problem hiding this comment.
🧹 Nitpick comments (1)
python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py (1)
436-454: Consider making the monitor iteration limit configurable.The
monitor()method has a hardcoded limit of 100 iterations with 0.1s sleep (total ~10s), which may not suit all use cases. The client-sidemonitor_cmdalready has a--countoption, but the server-side generator will stop after 100 iterations regardless.Consider adding a
max_iterationsparameter or using an infinite loop, letting the client control termination via the streaming protocol's cancellation.Proposed approach
`@export` - async def monitor(self, topic_name: str) -> AsyncGenerator[DdsSample, None]: + async def monitor(self, topic_name: str, max_iterations: int = 0) -> AsyncGenerator[DdsSample, None]: """Stream data samples from a topic as they arrive. - Polls the topic reader periodically and yields new samples. + Polls the topic reader periodically and yields new samples. + If max_iterations is 0, polls indefinitely until cancelled. """ import asyncio if not self._backend.is_connected: raise RuntimeError("Not connected -- call connect() first") if not self._backend.has_topic(topic_name): raise ValueError(f"Topic '{topic_name}' not registered") - for _ in range(100): + iterations = 0 + while max_iterations == 0 or iterations < max_iterations: result = self._backend.read(topic_name, max_samples=10) for sample in result.samples: yield sample await asyncio.sleep(0.1) + iterations += 1🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py` around lines 436 - 454, The monitor method currently stops after a hardcoded 100 iterations; change its signature (monitor) to accept an optional max_iterations: Optional[int] = None (or max_iterations: int = 100) and use that parameter instead of the literal 100 in the for loop, treating None as infinite looping (e.g., while True) so the generator only stops when max_iterations is reached or when the caller cancels the stream; ensure you keep existing checks (_backend.is_connected, _backend.has_topic) and continue calling _backend.read(topic_name, max_samples=10) with the same await asyncio.sleep(0.1) behavior so existing consumers (and monitor_cmd) can control termination.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py`:
- Around line 436-454: The monitor method currently stops after a hardcoded 100
iterations; change its signature (monitor) to accept an optional max_iterations:
Optional[int] = None (or max_iterations: int = 100) and use that parameter
instead of the literal 100 in the for loop, treating None as infinite looping
(e.g., while True) so the generator only stops when max_iterations is reached or
when the caller cancels the stream; ensure you keep existing checks
(_backend.is_connected, _backend.has_topic) and continue calling
_backend.read(topic_name, max_samples=10) with the same await asyncio.sleep(0.1)
behavior so existing consumers (and monitor_cmd) can control termination.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 233df602-d076-4ad0-9ddd-15fd9e9cfe38
📒 Files selected for processing (4)
python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/client.pypython/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/conftest.pypython/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.pypython/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver_test.py
raballew
left a comment
There was a problem hiding this comment.
As for the other PRs, replace asyncio with anyio as its the project convention. Also look at the comments below.
| if key not in fields: | ||
| raise ValueError(f"Unknown field '{key}' for topic '{topic_name}'") | ||
|
|
||
| sample = DdsSample(topic_name=topic_name, data=data, timestamp=time.time()) |
There was a problem hiding this comment.
This line stores the partial data dict as-is, but the real DdsBackend.publish at line 178 does idl_type(**data) which fills in missing fields with empty strings (via the dataclass defaults at line 65). So if you publish {"x": "10"} to a topic with fields ["x", "y"]:
- Real backend: a consumer reads
{"x": "10", "y": ""} - Mock backend: a consumer reads
{"x": "10"}
Also, unknown keys raise TypeError from the dataclass constructor in the real backend (line 178) but ValueError here (line 300). Code that catches ValueError will pass in tests but miss the TypeError in production.
And publishing {} succeeds silently in DdsBackend (all defaults), stores an empty dict here, and raises DdsTopicError in StatefulDdsBackend (conftest.py:84-88). Three backends, three behaviors.
The root cause is that there is no shared Protocol or ABC between backends (line 347 uses a union type). Consider filling defaults in mock publish to match real behavior, and normalizing exception types.
| history_depth: int | None = None, | ||
| ) -> DdsTopicInfo: | ||
| """Create a topic on the mock backend.""" | ||
| qos = DdsTopicQos() |
There was a problem hiding this comment.
This uses bare DdsTopicQos() (Pydantic defaults), but Dds.create_topic at line 403 uses self._default_qos() which reads from the driver's config fields (default_reliability, default_durability, default_history_depth at lines 342-344). MockDds (lines 467-468) has none of those config fields and no _default_qos() method.
So if someone configures Dds(default_reliability="BEST_EFFORT"), topics get BEST_EFFORT QoS. But MockDds always creates topics with RELIABLE QoS regardless of any config. Tests using MockDds with non-default QoS silently diverge from production.
Could you add the three config fields and a _default_qos() to MockDds, and call it here instead of DdsTopicQos()?
|
|
||
|
|
||
| @dataclass(kw_only=True, config=ConfigDict(arbitrary_types_allowed=True)) | ||
| class MockDds(Driver): |
There was a problem hiding this comment.
Dds.close() (lines 363-370) explicitly calls self._backend.disconnect() before super().close(). MockDds has no close() override, so the internal backend's disconnect() is never called on teardown. Not a resource leak per se (no native resources), but it is a lifecycle asymmetry between the two driver classes. Adding a close() that mirrors the Dds pattern would be a small fix.
|
|
||
| from cyclonedds.idl import IdlStruct | ||
|
|
||
| cls_name = topic_name.replace("/", "_").replace("-", "_").replace(".", "_") + "_Type" |
There was a problem hiding this comment.
This sanitization maps distinct topic names to the same Python class name. For example, "sensor/temp" and "sensor-temp" both produce "sensor_temp_Type". The wire-level DDS topic identity is fine (CycloneDDS uses the raw name string at line 144), so this won't cause data corruption on the bus. But at the Python level you get confusing debugging and potential namespace issues.
Also, no validation that the result is a valid Python identifier. Something like "123-topic" would produce "123_topic_Type" which is not a valid class name.
A hash suffix or reversible encoding would prevent collisions, plus an str.isidentifier() check would catch invalid names early.
| """Tear down all DDS entities and release the participant.""" | ||
| if not self._connected: | ||
| raise RuntimeError("Not connected to DDS domain") | ||
| self._writers.clear() |
There was a problem hiding this comment.
This clears the Python dicts and sets self._participant = None, but never calls .close() on the CycloneDDS DataWriter, DataReader, Topic, or DomainParticipant objects. CycloneDDS wraps C objects, and while it likely implements __del__ cleanup, relying on GC for native resource release is fragile (especially with reference cycles or interpreter shutdown).
Calling .close() on writers, readers, topics, and participant (in reverse creation order) before clearing the dicts would be the safe approach.
| ) -> DdsTopicInfo: | ||
| """Create a DDS topic with the given schema and QoS.""" | ||
| return DdsTopicInfo.model_validate( | ||
| self.call("create_topic", name, fields, reliability, durability, history_depth) |
There was a problem hiding this comment.
Passing name, fields, reliability, durability, history_depth as positional args to self.call() is fragile. If the server-side method signature changes parameter order, this silently breaks. Using keyword arguments would be more resilient across the gRPC remoting layer.
There was a problem hiding this comment.
Valid point, though a change here would mean a framework change as DriverClient.call() has signature call(method, *args) — positional-only is the project-wide convention used by all drivers. This would need to support kwargs.
| DdsTopicInfo( | ||
| name=name, | ||
| fields=fields, | ||
| qos=self._qos_map.get(name, DdsTopicQos()), |
There was a problem hiding this comment.
Both self._qos_map.get(name, DdsTopicQos()) here and self._sample_counts.get(name, 0) at line 166 can never hit their defaults because create_topic() always populates both dicts. Same pattern at line 180. The .get() fallbacks mask potential bugs: if a topic somehow ends up in _topics but not in _qos_map, you'd silently get wrong QoS instead of a KeyError that would surface the bug.
|
|
||
| topic_name: str | ||
| samples: list[DdsSample] = [] | ||
| sample_count: int = 0 |
There was a problem hiding this comment.
Nothing prevents creating DdsReadResult(samples=[s1, s2], sample_count=0). A Pydantic model_validator enforcing sample_count == len(samples) (or just making sample_count computed) would catch structural inconsistencies early.
There was a problem hiding this comment.
Fixed. Added a Pydantic model_validator that enforces sample_count == len(samples).
| if self._backend.is_connected: | ||
| try: | ||
| self._backend.disconnect() | ||
| except Exception: |
There was a problem hiding this comment.
This catches all exceptions during disconnect, including things like AttributeError or TypeError that would indicate real bugs rather than expected cleanup failures. A narrower catch (e.g., RuntimeError or the specific CycloneDDS exception types) would let actual programming errors propagate.
|
|
||
| @base.command(name="monitor") | ||
| @click.argument("topic_name") | ||
| @click.option("--count", "-n", default=10, help="Number of events") |
There was a problem hiding this comment.
This option suggests the user can control how many events to receive, but the server-side monitor() (driver.py:449) always exits after 100 iterations regardless. If someone passes --count 500, they will get at most 100 iterations worth of samples with no indication that the server stopped early. Worth either documenting the ceiling or making the server-side limit configurable.
There was a problem hiding this comment.
Server now polls indefinitely (no 100-iteration cap), so --count works as advertised.
- Replace asyncio with anyio (project convention) - Normalize publish behavior: mock backend now fills missing fields with "" defaults and raises ValueError for unknown fields, matching the real CycloneDDS dataclass behavior - Add QoS config fields and _default_qos() to MockDds for parity with Dds - Add close() override to MockDds to mirror Dds lifecycle - Explicitly close CycloneDDS writers/readers/topics/participant in disconnect() instead of relying on GC - Make monitor() max_iterations configurable (0 = unlimited), add mid-loop disconnect handling for graceful exit - Add create-topic and publish CLI commands - Pin cyclonedds>=0.10.0,<1.0 to prevent native compat breakage - Fix _make_idl_type name collisions with hash suffix and isidentifier check - Add Dds(use_mock=True) test coverage - Use dataclasses.fields() instead of __dataclass_fields__ - Remove always-True success field from DdsPublishResult - Add DdsReadResult model_validator for sample_count consistency - Remove .get() fallbacks on always-populated dicts - Narrow close() exception catch to RuntimeError - Document read/monitor mutual exclusivity on same topic - Fix source_archive URL in pyproject.toml - Fix README monitor example to show bounded usage - Align StatefulDdsBackend enforcement with real/mock backends
Add parameterized backend parity tests (mock + stateful), driver interface parity checks, CLI completeness assertions, Pydantic model invariant guards, _make_idl_type collision tests, and an asyncio convention enforcer to prevent recurrence of the issues identified during code review.
There was a problem hiding this comment.
Actionable comments posted: 4
♻️ Duplicate comments (1)
python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py (1)
118-137:⚠️ Potential issue | 🟠 MajorDon't hide teardown bugs behind
except Exception: pass.These handlers suppress every cleanup failure and still mark the backend disconnected, so an
AttributeErrororTypeErrorin shutdown looks the same as a clean teardown. Catch only the expected DDS shutdown exceptions and log them; let unexpected bugs surface.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py` around lines 118 - 137, The teardown currently swallows all errors by using bare "except Exception: pass" for writer.close(), reader.close(), topic.close(), and participant.close(), so change each broad except to explicitly handle known DDS shutdown exceptions (e.g., the DDS library's specific exception class) and log them, while letting unexpected exceptions propagate; specifically, in the loops over self._writers, self._readers, self._topics and the self._participant close block, replace "except Exception: pass" with "except <DDSShutdownException> as e: <logger>.error(..., exc_info=e)" (or logging.exception) and add a final "except Exception: raise" to surface programming errors. Ensure you use the existing logger (e.g., self._logger) or the logging module and reference the same close() calls and the attributes _writers, _readers, _topics, and _participant so the behavior is consistent.
🧹 Nitpick comments (1)
python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/regression_test.py (1)
1-224: Consider folding this suite intodriver_test.py.Keeping driver tests split across
driver_test.pyandregression_test.pymakes the canonical package test entrypoint less obvious than the rest of the driver packages. Based on learnings, "Each driver package must include comprehensive tests indriver_test.py."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/regression_test.py` around lines 1 - 224, The regression_test.py suite should be folded into driver_test.py to centralize driver tests; move the tests (classes TestBackendParity, TestDriverInterfaceParity, TestCliCompleteness, TestModelInvariants, TestIdlTypeCollisions, TestProjectConventions and their test_* methods) into driver_test.py, update imports from .driver and .common inside driver_test.py (e.g., Dds, MockDds, _make_idl_type, DdsReadResult, DdsSample, DdsPublishResult, DdsTopicQos, serve), remove or delete regression_test.py, and run pytest to ensure discovery and that no duplicate test names or import conflicts remain (adjust any relative imports or fixtures like any_backend as needed).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/common.py`:
- Around line 25-30: DdsTopicQos currently allows zero/negative history_depth;
enforce history_depth >= 1 by adding validation to the DdsTopicQos model: either
change the field declaration to use pydantic Field with ge=1 and default 10
(e.g., history_depth: int = Field(10, ge=1)) or add a pydantic `@validator` for
"history_depth" that raises a ValueError if value < 1, so invalid values are
rejected before reaching the mock or real DDS backends.
In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py`:
- Around line 167-177: The code currently assigns to internal maps
(self._idl_types, self._topics, self._qos_map, self._sample_counts) before
constructing DDS objects, so if DataWriter or DataReader construction fails you
end up with a half-registered topic; change create_topic() to first build local
variables (idl_type via _make_idl_type, cqos via _build_cyclonedds_qos, create
Topic, then construct DataWriter and DataReader) and only after all
constructions succeed atomically assign those locals into self._idl_types,
self._topics, self._qos_map, self._sample_counts, self._writers and
self._readers (or alternatively catch exceptions and roll back any partial
assignments) so subsequent create_topic() calls won’t see a partially-registered
topic.
In
`@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/regression_test.py`:
- Around line 101-120: The test_export_methods_match helper is incorrectly
detecting exported methods by checking for __wrapped__ (which misses monitor()
exported without `@validate_call`); update the helper used in
test_export_methods_match (the inner _exported and the dds_methods/mock_methods
sets) to use the official export marker constants from
jumpstarter.driver.decorators (instead of __wrapped__) so it detects methods
annotated with `@export` regardless of `@validate_call`; reference Dds, MockDds,
monitor(), and the export marker constants from jumpstarter.driver.decorators
when changing the membership checks.
---
Duplicate comments:
In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py`:
- Around line 118-137: The teardown currently swallows all errors by using bare
"except Exception: pass" for writer.close(), reader.close(), topic.close(), and
participant.close(), so change each broad except to explicitly handle known DDS
shutdown exceptions (e.g., the DDS library's specific exception class) and log
them, while letting unexpected exceptions propagate; specifically, in the loops
over self._writers, self._readers, self._topics and the self._participant close
block, replace "except Exception: pass" with "except <DDSShutdownException> as
e: <logger>.error(..., exc_info=e)" (or logging.exception) and add a final
"except Exception: raise" to surface programming errors. Ensure you use the
existing logger (e.g., self._logger) or the logging module and reference the
same close() calls and the attributes _writers, _readers, _topics, and
_participant so the behavior is consistent.
---
Nitpick comments:
In
`@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/regression_test.py`:
- Around line 1-224: The regression_test.py suite should be folded into
driver_test.py to centralize driver tests; move the tests (classes
TestBackendParity, TestDriverInterfaceParity, TestCliCompleteness,
TestModelInvariants, TestIdlTypeCollisions, TestProjectConventions and their
test_* methods) into driver_test.py, update imports from .driver and .common
inside driver_test.py (e.g., Dds, MockDds, _make_idl_type, DdsReadResult,
DdsSample, DdsPublishResult, DdsTopicQos, serve), remove or delete
regression_test.py, and run pytest to ensure discovery and that no duplicate
test names or import conflicts remain (adjust any relative imports or fixtures
like any_backend as needed).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 211b6b58-18ac-4144-8346-c7279b4c1f26
⛔ Files ignored due to path filters (1)
python/uv.lockis excluded by!**/*.lock
📒 Files selected for processing (8)
python/packages/jumpstarter-driver-dds/README.mdpython/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/client.pypython/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/common.pypython/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/conftest.pypython/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.pypython/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver_test.pypython/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/regression_test.pypython/packages/jumpstarter-driver-dds/pyproject.toml
✅ Files skipped from review due to trivial changes (3)
- python/packages/jumpstarter-driver-dds/pyproject.toml
- python/packages/jumpstarter-driver-dds/README.md
- python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver_test.py
| class DdsTopicQos(BaseModel): | ||
| """Quality of Service settings for a DDS topic.""" | ||
|
|
||
| reliability: DdsReliability = DdsReliability.RELIABLE | ||
| durability: DdsDurability = DdsDurability.VOLATILE | ||
| history_depth: int = 10 |
There was a problem hiding this comment.
Reject non-positive history_depth.
Zero or negative depths are accepted here today. In the mock backend that turns trimming into buggy buf[-history_depth:] slicing; in the real backend the same value is forwarded into DDS QoS. Please enforce history_depth >= 1 before the value reaches either backend.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/common.py`
around lines 25 - 30, DdsTopicQos currently allows zero/negative history_depth;
enforce history_depth >= 1 by adding validation to the DdsTopicQos model: either
change the field declaration to use pydantic Field with ge=1 and default 10
(e.g., history_depth: int = Field(10, ge=1)) or add a pydantic `@validator` for
"history_depth" that raises a ValueError if value < 1, so invalid values are
rejected before reaching the mock or real DDS backends.
| def _make_idl_type(topic_name: str, fields: list[str]): | ||
| """Dynamically create a CycloneDDS IdlStruct type for the given fields. | ||
|
|
||
| Each field name maps to a ``str`` type. For complex or mixed-type | ||
| schemas, define custom IdlStruct subclasses directly and register | ||
| them with the backend. | ||
|
|
||
| A hash suffix is appended to the generated class name to prevent | ||
| collisions when distinct topic names sanitise to the same identifier | ||
| (e.g. ``"sensor/temp"`` and ``"sensor-temp"``). | ||
| """ | ||
| from cyclonedds.idl import IdlStruct | ||
|
|
||
| sanitised = topic_name.replace("/", "_").replace("-", "_").replace(".", "_") | ||
| hash_suffix = hashlib.md5(topic_name.encode()).hexdigest()[:8] | ||
| cls_name = f"{sanitised}_{hash_suffix}_Type" | ||
| if not cls_name.isidentifier(): | ||
| cls_name = f"Topic_{hash_suffix}_Type" | ||
| dc_fields = [(f, str, dc.field(default="")) for f in fields] | ||
| idl_cls = dc.make_dataclass(cls_name, dc_fields, bases=(IdlStruct,)) | ||
| return idl_cls |
There was a problem hiding this comment.
Validate topic field names consistently across both backends.
_make_idl_type() depends on Python dataclass field rules, so duplicate names or non-identifiers/keywords will fail on the CycloneDDS path. MockDdsBackend.create_topic() currently accepts those schemas, which lets mock-backed tests pass on topics that production cannot create. Add one shared field validator and call it from both backends before registering the topic.
You can verify the Python-side constraint with:
#!/bin/bash
python - <<'PY'
import dataclasses as dc
cases = [
["x", "x"],
["bad-name"],
["123"],
["class"],
]
for fields in cases:
try:
dc.make_dataclass("T", [(f, str, dc.field(default="")) for f in fields])
print(fields, "accepted")
except Exception as exc:
print(fields, type(exc).__name__, exc)
PYAlso applies to: 293-309
| idl_type = _make_idl_type(name, fields) | ||
| self._idl_types[name] = idl_type | ||
|
|
||
| cqos = _build_cyclonedds_qos(qos) | ||
| topic = Topic(self._participant, name, idl_type, qos=cqos) | ||
| self._topics[name] = topic | ||
| self._qos_map[name] = qos | ||
| self._sample_counts[name] = 0 | ||
|
|
||
| self._writers[name] = DataWriter(self._participant, topic, qos=cqos) | ||
| self._readers[name] = DataReader(self._participant, topic, qos=cqos) |
There was a problem hiding this comment.
Make topic registration atomic.
This method writes internal state before DataWriter and DataReader creation is known to succeed. If either constructor fails, the backend is left with a half-registered topic and the next create_topic() call returns "already exists". Only commit the dict updates after all DDS objects are created, or rollback on exception.
Possible fix
idl_type = _make_idl_type(name, fields)
- self._idl_types[name] = idl_type
-
cqos = _build_cyclonedds_qos(qos)
topic = Topic(self._participant, name, idl_type, qos=cqos)
- self._topics[name] = topic
- self._qos_map[name] = qos
- self._sample_counts[name] = 0
-
- self._writers[name] = DataWriter(self._participant, topic, qos=cqos)
- self._readers[name] = DataReader(self._participant, topic, qos=cqos)
+ writer = None
+ try:
+ writer = DataWriter(self._participant, topic, qos=cqos)
+ reader = DataReader(self._participant, topic, qos=cqos)
+ except Exception:
+ if writer is not None:
+ writer.close()
+ topic.close()
+ raise
+
+ self._idl_types[name] = idl_type
+ self._topics[name] = topic
+ self._qos_map[name] = qos
+ self._sample_counts[name] = 0
+ self._writers[name] = writer
+ self._readers[name] = reader📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| idl_type = _make_idl_type(name, fields) | |
| self._idl_types[name] = idl_type | |
| cqos = _build_cyclonedds_qos(qos) | |
| topic = Topic(self._participant, name, idl_type, qos=cqos) | |
| self._topics[name] = topic | |
| self._qos_map[name] = qos | |
| self._sample_counts[name] = 0 | |
| self._writers[name] = DataWriter(self._participant, topic, qos=cqos) | |
| self._readers[name] = DataReader(self._participant, topic, qos=cqos) | |
| idl_type = _make_idl_type(name, fields) | |
| cqos = _build_cyclonedds_qos(qos) | |
| topic = Topic(self._participant, name, idl_type, qos=cqos) | |
| writer = None | |
| try: | |
| writer = DataWriter(self._participant, topic, qos=cqos) | |
| reader = DataReader(self._participant, topic, qos=cqos) | |
| except Exception: | |
| if writer is not None: | |
| writer.close() | |
| topic.close() | |
| raise | |
| self._idl_types[name] = idl_type | |
| self._topics[name] = topic | |
| self._qos_map[name] = qos | |
| self._sample_counts[name] = 0 | |
| self._writers[name] = writer | |
| self._readers[name] = reader |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py`
around lines 167 - 177, The code currently assigns to internal maps
(self._idl_types, self._topics, self._qos_map, self._sample_counts) before
constructing DDS objects, so if DataWriter or DataReader construction fails you
end up with a half-registered topic; change create_topic() to first build local
variables (idl_type via _make_idl_type, cqos via _build_cyclonedds_qos, create
Topic, then construct DataWriter and DataReader) and only after all
constructions succeed atomically assign those locals into self._idl_types,
self._topics, self._qos_map, self._sample_counts, self._writers and
self._readers (or alternatively catch exceptions and roll back any partial
assignments) so subsequent create_topic() calls won’t see a partially-registered
topic.
| def test_export_methods_match(self): | ||
| def _exported(cls): | ||
| return { | ||
| name | ||
| for name in dir(cls) | ||
| if not name.startswith("_") | ||
| and callable(getattr(cls, name, None)) | ||
| and hasattr(getattr(cls, name), "__wrapped__") | ||
| } | ||
|
|
||
| dds_methods = { | ||
| name for name in vars(Dds) if not name.startswith("_") and hasattr(getattr(Dds, name, None), "__wrapped__") | ||
| } | ||
| mock_methods = { | ||
| name | ||
| for name in vars(MockDds) | ||
| if not name.startswith("_") and hasattr(getattr(MockDds, name, None), "__wrapped__") | ||
| } | ||
| assert dds_methods == mock_methods, ( | ||
| f"Export mismatch: Dds has {dds_methods - mock_methods}, MockDds has {mock_methods - dds_methods}" |
There was a problem hiding this comment.
Use the export markers here; __wrapped__ skips monitor().
@export marks methods with driver-call metadata, not __wrapped__. Since monitor() is exported without @validate_call, this test never compares the streaming method sets and can still pass after a public-surface regression.
Possible fix
def _exported(cls):
return {
name
- for name in dir(cls)
- if not name.startswith("_")
- and callable(getattr(cls, name, None))
- and hasattr(getattr(cls, name), "__wrapped__")
+ for name, method in vars(cls).items()
+ if not name.startswith("_")
+ and (
+ getattr(method, MARKER_DRIVERCALL, None) == MARKER_MAGIC
+ or getattr(method, MARKER_STREAMING_DRIVERCALL, None) == MARKER_MAGIC
+ )
}
- dds_methods = {
- name for name in vars(Dds) if not name.startswith("_") and hasattr(getattr(Dds, name, None), "__wrapped__")
- }
- mock_methods = {
- name
- for name in vars(MockDds)
- if not name.startswith("_") and hasattr(getattr(MockDds, name, None), "__wrapped__")
- }
+ dds_methods = _exported(Dds)
+ mock_methods = _exported(MockDds)Use the marker constants from jumpstarter.driver.decorators for the helper above.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/regression_test.py`
around lines 101 - 120, The test_export_methods_match helper is incorrectly
detecting exported methods by checking for __wrapped__ (which misses monitor()
exported without `@validate_call`); update the helper used in
test_export_methods_match (the inner _exported and the dds_methods/mock_methods
sets) to use the official export marker constants from
jumpstarter.driver.decorators (instead of __wrapped__) so it detects methods
annotated with `@export` regardless of `@validate_call`; reference Dds, MockDds,
monitor(), and the export marker constants from jumpstarter.driver.decorators
when changing the membership checks.
Summary
jumpstarter-driver-ddspackage providing DDS (Data Distribution Service) pub/sub messaging via Eclipse CycloneDDSDdsdriver (using CycloneDDS) and aMockDdsdriver withMockDdsBackendfor testing without native DDS dependenciesImplementation Details
Architecture follows the same patterns as existing automotive drivers (XCP, gPTP, DoIP):
common.py: Pydantic models for RPC-safe types (DdsTopicQos,DdsParticipantInfo,DdsSample, etc.)driver.py:DdsBackend(real CycloneDDS) +MockDdsBackend(in-memory) +Ddsdriver +MockDdsdriverclient.py:DdsClientwithmodel_validatedeserialization and Click CLIconftest.py:StatefulDdsBackendwith lifecycle rule enforcement + pytest fixturesdriver_test.py: 6 test levels (models, backend unit, e2e mock, error paths, stateful, full workflows)QoS support: Reliability (RELIABLE/BEST_EFFORT), Durability (VOLATILE/TRANSIENT_LOCAL/TRANSIENT/PERSISTENT), History depth
Dependency:
cyclonedds>=0.10.0(EPL-2.0 licensed, used as a runtime dependency)Test plan
make pkg-test-jumpstarter-driver-dds)make lint-fix-- All checks passed)Made with Cursor