Skip to content

feat: add DDS driver using Eclipse CycloneDDS#393

Open
vtz wants to merge 4 commits intojumpstarter-dev:mainfrom
vtz:feat/dds-driver
Open

feat: add DDS driver using Eclipse CycloneDDS#393
vtz wants to merge 4 commits intojumpstarter-dev:mainfrom
vtz:feat/dds-driver

Conversation

@vtz
Copy link
Copy Markdown
Contributor

@vtz vtz commented Mar 29, 2026

Summary

  • Add jumpstarter-driver-dds package providing DDS (Data Distribution Service) pub/sub messaging via Eclipse CycloneDDS
  • Supports domain participation, topic management with configurable QoS (reliability, durability, history depth), publish/subscribe, streaming monitor, and a Click CLI interface
  • Includes both a real Dds driver (using CycloneDDS) and a MockDds driver with MockDdsBackend for testing without native DDS dependencies
  • Comprehensive test suite with 58 tests covering unit, e2e (gRPC boundary), error paths, and stateful lifecycle tests (connection, topic schema enforcement, pub/sub, reconnect, call log/audit trail)
  • Full documentation with README, exporter YAML example, and docs index update

Implementation Details

Architecture follows the same patterns as existing automotive drivers (XCP, gPTP, DoIP):

  • common.py: Pydantic models for RPC-safe types (DdsTopicQos, DdsParticipantInfo, DdsSample, etc.)
  • driver.py: DdsBackend (real CycloneDDS) + MockDdsBackend (in-memory) + Dds driver + MockDds driver
  • client.py: DdsClient with model_validate deserialization and Click CLI
  • conftest.py: StatefulDdsBackend with lifecycle rule enforcement + pytest fixtures
  • driver_test.py: 6 test levels (models, backend unit, e2e mock, error paths, stateful, full workflows)

QoS support: Reliability (RELIABLE/BEST_EFFORT), Durability (VOLATILE/TRANSIENT_LOCAL/TRANSIENT/PERSISTENT), History depth

Dependency: cyclonedds>=0.10.0 (EPL-2.0 licensed, used as a runtime dependency)

Test plan

  • All 58 tests pass (make pkg-test-jumpstarter-driver-dds)
  • Lint passes (make lint-fix -- All checks passed)
  • CI pipeline passes on GitHub Actions
  • Verify CycloneDDS wheel installs on Linux CI runners

Made with Cursor

@netlify
Copy link
Copy Markdown

netlify bot commented Mar 29, 2026

Deploy Preview for jumpstarter-docs failed. Why did it fail? →

Name Link
🔨 Latest commit 81a864b
🔍 Latest deploy log https://app.netlify.com/projects/jumpstarter-docs/deploys/69cd898b342f96000838d5b8

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Mar 29, 2026

📝 Walkthrough

Walkthrough

Adds a new jumpstarter-driver-dds package: CycloneDDS-backed and mock DDS drivers, a DdsClient CLI, Pydantic schemas/enums, extensive tests (unit, E2E, regression), documentation and examples, packaging/pyproject and workspace integration.

Changes

Cohort / File(s) Summary
Documentation
python/docs/source/reference/package-apis/drivers/dds.md, python/docs/source/reference/package-apis/drivers/index.md, python/packages/jumpstarter-driver-dds/README.md
New DDS docs and index entry; README documents installation, config, QoS, public API, and examples.
Type Definitions
python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/common.py
Added DDS enums and Pydantic models for QoS, participant/topic metadata, samples, publish/read results, plus a post-validator enforcing sample_count consistency.
Driver Implementation
python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py
New driver module with real DdsBackend (CycloneDDS integration) and MockDdsBackend, dynamic IDL type generation, QoS mapping, publish/read/monitor semantics, and Dds/MockDds wrappers.
Client Interface
python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/client.py
DdsClient (DriverClient) exposing connect/disconnect, topic management, publish/read/monitor, participant info, streaming monitor generator, and Click CLI commands.
Testing & Fixtures
python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/conftest.py, python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver_test.py, python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/regression_test.py
Fixtures including StatefulDdsBackend, parametrized any_backend, stateful_client; comprehensive unit, E2E, regression, and stateful tests covering lifecycle, schema, QoS, error mappings, and CLI.
Package & Workspace
python/packages/jumpstarter-driver-dds/pyproject.toml, python/packages/jumpstarter-driver-dds/.gitignore, python/pyproject.toml
New package pyproject (deps: jumpstarter, cyclonedds>=0.10.0), entry-point jumpstarter.drivers:Dds, pytest config, .gitignore, and workspace mapping added.
Examples
python/packages/jumpstarter-driver-dds/examples/exporter.yaml
Added ExporterConfig example manifest showing driver instantiation and QoS defaults (domain_id: 0, RELIABLE, VOLATILE, history_depth: 10).

Sequence Diagram

sequenceDiagram
    participant User as User
    participant Client as DdsClient
    participant Driver as Dds / MockDds
    participant Backend as DdsBackend / MockDdsBackend
    participant CycloneDDS as CycloneDDS

    User->>Client: connect()
    Client->>Driver: connect()
    Driver->>Backend: connect()
    Backend->>CycloneDDS: (real) create DomainParticipant
    CycloneDDS-->>Backend: participant
    Backend-->>Driver: DdsParticipantInfo
    Driver-->>Client: DdsParticipantInfo

    User->>Client: create_topic(name, fields, qos)
    Client->>Driver: create_topic(...)
    Driver->>Backend: create_topic(...)
    Backend->>CycloneDDS: (real) generate IdlStruct + create Topic/Writer/Reader
    CycloneDDS-->>Backend: topic objects
    Backend-->>Driver: DdsTopicInfo
    Driver-->>Client: DdsTopicInfo

    User->>Client: publish(topic_name, data)
    Client->>Driver: publish(...)
    Driver->>Backend: publish(...)
    Backend->>CycloneDDS: (real) write(sample)
    CycloneDDS-->>Backend: success
    Backend-->>Driver: DdsPublishResult
    Driver-->>Client: DdsPublishResult

    User->>Client: read(topic_name)
    Client->>Driver: read(...)
    Driver->>Backend: read(...)
    Backend-->>Driver: DdsReadResult
    Driver-->>Client: DdsReadResult

    User->>Client: disconnect()
    Client->>Driver: disconnect()
    Driver->>Backend: disconnect()
    Backend->>CycloneDDS: cleanup (real)
    CycloneDDS-->>Backend: cleanup complete
    Backend-->>Driver: success
    Driver-->>Client: success
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Suggested labels

backport release-0.7

Suggested reviewers

  • mangelajo
  • NickCao
  • kirkbrauer

Poem

🐰 I hopped with enums, tests, and a mock so spry,

Topics and QoS now watch samples fly,
Cyclone hums softly, IDL types take shape,
CLI and docs ready — a small rabbit's clap!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 29.47% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely summarizes the main change: adding a DDS driver using Eclipse CycloneDDS as the primary feature.
Description check ✅ Passed The description is comprehensive and directly related to the changeset, detailing the new package, architecture, implementation, test plan, and outstanding items.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 12

🧹 Nitpick comments (8)
python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py (3)

396-412: monitor() accesses backend internals directly.

The method accesses self._backend._require_connected() and self._backend._topics directly. These are internal details that should be exposed through public methods for consistency.

♻️ Suggested improvement

Add a public method to backends:

def has_topic(self, name: str) -> bool:
    self._require_connected()
    return name in self._topics

Then:

-        self._backend._require_connected()
-        if topic_name not in self._backend._topics:
+        if not self._backend.has_topic(topic_name):
             raise ValueError(f"Topic '{topic_name}' not registered")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py`
around lines 396 - 412, monitor currently accesses backend internals
(_require_connected and _topics) directly; add a public backend method
has_topic(name: str) -> bool that calls the backend's internal
_require_connected() and returns name in self._topics, then update monitor to
call backend.has_topic(topic_name) instead of inspecting _topics and remove the
direct call to _require_connected; keep using the existing public
backend.read(...) to fetch samples. Ensure the new has_topic is part of the
backend API so monitor only uses public methods (has_topic and read).

52-65: Dynamic IDL type creation assumes all fields are strings.

The _make_idl_type function creates dataclass fields all typed as str with default "". This limits the driver to string-only topic schemas. Consider documenting this limitation or supporting type annotations.

 def _make_idl_type(topic_name: str, fields: list[str]):
-    """Dynamically create a CycloneDDS IdlStruct type for the given fields.
+    """Dynamically create a CycloneDDS IdlStruct type for the given fields.

-    Each field name maps to a ``str`` type. The generated class name is
-    derived from the topic name to avoid collisions between topics.
+    Each field name maps to a ``str`` type. For complex types, consider
+    pre-defining IdlStruct classes. The generated class name is derived
+    from the topic name to avoid collisions between topics.
+
+    Note: This implementation only supports string fields. For mixed types,
+    define custom IdlStruct subclasses directly.
     """
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py`
around lines 52 - 65, The function _make_idl_type currently forces every field
to be a str with default "", which prevents non-string schemas; change the API
to accept fields as either a list of names or a list of (name, type, [default])
tuples and build dc_fields accordingly in _make_idl_type: preserve the existing
behavior when items are plain strings (map to str, default ""), but when an item
is a tuple use the provided type and default (or omit default if not provided)
so dataclass fields have correct type annotations; update callers that pass
fields to supply types where needed and add or update a short docstring noting
the supported field formats and backward compatibility with the string-only
form.

325-331: Direct access to _backend._connected breaks encapsulation.

The close() method accesses _backend._connected directly instead of using the backend's public interface. Consider adding an is_connected property or method to the backend.

♻️ Proposed fix to improve encapsulation

Add to DdsBackend and MockDdsBackend:

`@property`
def is_connected(self) -> bool:
    return self._connected

Then update Dds.close():

 def close(self):
-    if self._backend._connected:
+    if self._backend.is_connected:
         try:
             self._backend.disconnect()
         except Exception:
             logger.warning("Failed to disconnect DDS backend", exc_info=True)
     super().close()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py`
around lines 325 - 331, The close() method currently breaks encapsulation by
reading _backend._connected directly; add a public read-only property
is_connected to the backend implementations (e.g., DdsBackend and
MockDdsBackend) that returns their internal _connected flag, then update
Dds.close (the close method shown) to call self._backend.is_connected instead of
accessing _backend._connected and retain the existing try/except around
self._backend.disconnect() and the subsequent super().close() call.
python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/client.py (1)

177-188: Method name collision: CLI monitor function shadows the class method.

The inner function monitor on line 179 has the same name as the class method self.monitor() on line 131. While this works because self.monitor() is accessed through the closure, it can be confusing during debugging. Consider renaming the CLI function to monitor_cmd or similar for clarity.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/client.py`
around lines 177 - 188, The CLI inner function named monitor shadows the class
method self.monitor(), which is confusing; rename the CLI callback (e.g.,
monitor_cmd) and update the decorator reference (`@base.command`() bound function
name) and any uses so the CLI handler is distinct from the class method, keeping
the call to self.monitor() unchanged inside the new monitor_cmd function; ensure
click.option remains attached to the renamed function and update any tests or
references to the old monitor CLI name if present.
python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py (3)

216-220: Temporary config file handle remains open while ptp4l runs.

The NamedTemporaryFile is created with delete=False but the file handle is kept open via self._config_file. After writing and flushing, the file descriptor remains open. On Windows this could prevent ptp4l from reading it; on Linux (the target), it works but holding the handle isn't necessary.

♻️ Suggested improvement to close handle after writing
-        self._config_file = tempfile.NamedTemporaryFile(
+        config_file = tempfile.NamedTemporaryFile(
             mode="w", suffix=".cfg", prefix="ptp4l_", delete=False
         )
-        self._config_file.write(config_content)
-        self._config_file.flush()
+        config_file.write(config_content)
+        config_file.close()
+        self._config_file_path = config_file.name

Then update line 232 and 297-303 to use self._config_file_path instead.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py`
around lines 216 - 220, The temporary config file handle self._config_file is
left open after writing; change the creation to write and flush then close the
NamedTemporaryFile and store its path in self._config_file_path (e.g.,
self._config_file_path = self._config_file.name) and remove/close
self._config_file to release the descriptor, then update any code that
referenced self._config_file (notably the code paths in start() and the ptp4l
invocation around the earlier lines) to use self._config_file_path when passing
the filename to ptp4l or other readers.

360-382: Stub implementations return empty/default values.

get_clock_identity() returns an empty string and get_parent_info() returns default values. These appear to be placeholder implementations. Consider adding a TODO comment or raising NotImplementedError if these are intended to be implemented later.

Would you like me to help implement these methods by parsing additional ptp4l output or querying the PTP management interface?

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py`
around lines 360 - 382, The stub methods get_clock_identity and get_parent_info
currently return empty/default values; replace those placeholders by raising
NotImplementedError (with a clear message like "get_clock_identity not
implemented" / "get_parent_info not implemented") after the existing
self._require_started() call and add a TODO comment indicating the intended
implementation (e.g., parse ptp4l output or query PTP management interface) so
callers and CI know these are intentionally unimplemented; keep the existing
decorators (`@export`, `@validate_call`) and method signatures intact.

419-419: Magic number 100 for streaming iteration limit.

Both Gptp.read() and MockGptp.read() iterate 100 times before stopping. This hardcoded limit could be documented or made configurable.

Also applies to: 581-581

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py` at
line 419, The hardcoded magic number 100 used in the streaming loops of
Gptp.read and MockGptp.read should be replaced with a named, configurable limit:
introduce a constant or configurable attribute (e.g., STREAM_ITER_LIMIT or
self.max_stream_iterations) and use it instead of the literal 100 in both
Gptp.read and MockGptp.read; expose the value via constructor args or config so
callers/tests can override, update any places that rely on the default, and add
a brief docstring or comment explaining the purpose of the limit.
python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver_test.py (1)

576-594: Network namespace cleanup may leave orphaned resources on test failure.

The veth_pair fixture uses check=True for setup but check=False for teardown. If setup fails partway through (e.g., after creating ns-ptp-master but before creating the veth pair), the teardown won't clean up properly since it only deletes namespaces.

♻️ Suggested improvement for robust cleanup
     `@pytest.fixture`
     def veth_pair(self):
         import subprocess as sp
+        created_ns = []
         cmds = [
             "ip netns add ns-ptp-master",
             "ip netns add ns-ptp-slave",
             "ip link add veth-m type veth peer name veth-s",
             "ip link set veth-m netns ns-ptp-master",
             "ip link set veth-s netns ns-ptp-slave",
             "ip netns exec ns-ptp-master ip addr add 10.99.0.1/24 dev veth-m",
             "ip netns exec ns-ptp-slave ip addr add 10.99.0.2/24 dev veth-s",
             "ip netns exec ns-ptp-master ip link set veth-m up",
             "ip netns exec ns-ptp-slave ip link set veth-s up",
         ]
-        for cmd in cmds:
-            sp.run(cmd.split(), check=True)
-        yield ("ns-ptp-master", "veth-m", "ns-ptp-slave", "veth-s")
-        sp.run("ip netns del ns-ptp-master".split(), check=False)
-        sp.run("ip netns del ns-ptp-slave".split(), check=False)
+        try:
+            for cmd in cmds:
+                sp.run(cmd.split(), check=True)
+                if "netns add" in cmd:
+                    ns_name = cmd.split()[-1]
+                    created_ns.append(ns_name)
+            yield ("ns-ptp-master", "veth-m", "ns-ptp-slave", "veth-s")
+        finally:
+            for ns in reversed(created_ns):
+                sp.run(["ip", "netns", "del", ns], check=False)
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/client.py`:
- Around line 50-55: The current list_topics method swallows malformed responses
by returning [] when raw isn't a list; change list_topics to validate the RPC
result: check that raw is a list and that each element can be parsed with
DdsTopicInfo.model_validate, and if not raise a clear exception (e.g.,
TypeError/ValueError) that includes the unexpected raw payload to surface
protocol/serialization bugs; keep the existing behavior of returning the list of
DdsTopicInfo instances when validation succeeds.
- Around line 74-85: The CLI helper _register_lifecycle_commands is registering
functions named connect_cmd and disconnect_cmd which produce unwanted CLI names
like connect-cmd; update the base.command() calls inside
_register_lifecycle_commands to pass explicit name arguments (e.g.
base.command(name="connect") for the connect_cmd function and
base.command(name="disconnect") for disconnect_cmd) so the CLI commands are
"connect" and "disconnect"; do the same pattern for other command functions that
end with _cmd (e.g. read_cmd → name="read", monitor_cmd → name="monitor") to
ensure correct kebab-case names without the trailing "-cmd".

In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/conftest.py`:
- Around line 69-82: In publish(self, topic_name, data) add a check that all
required fields declared in self._topic_fields[topic_name] are present in the
incoming data (not just rejecting unexpected keys): compute the set difference
between declared fields and data.keys(), and if any are missing raise
DdsTopicError with a message listing the missing fields and valid fields; keep
the existing unknown-field validation and the rest of the logic
(super().publish, _total_published, _call_log) unchanged.

In `@python/packages/jumpstarter-driver-dds/pyproject.toml`:
- Around line 23-25: The source_archive URL under
[tool.hatch.metadata.hooks.vcs.urls] in pyproject.toml uses the wrong repo slug;
update the source_archive value to point to the actual repository (replace
"https://github.com/jumpstarter-dev/repo/archive/{commit_hash}.zip" with
"https://github.com/jumpstarter-dev/jumpstarter/archive/{commit_hash}.zip") so
the generated source archive link is valid.
- Around line 11-14: The package manifest is missing the direct runtime
dependency on click even though jumpstarter_driver_dds/client.py imports and
uses click (e.g., command decorators and output calls); update
python/packages/jumpstarter-driver-dds/pyproject.toml by adding click (e.g.,
"click>=8.0" or your project's minimum supported click) to the dependencies list
so the package declares its own required runtime dependency.

In `@python/packages/jumpstarter-driver-dds/README.md`:
- Around line 101-105: The example uses the streaming API
dds.monitor("sensor/temperature") which never returns, so dds.disconnect() is
unreachable; update the README snippet to either show a bounded read (e.g.,
iterate a fixed number of samples) or wrap the monitor loop in try/finally to
ensure dds.disconnect() is called, or explicitly state that the example is
long-lived and must be interrupted manually; refer to the monitor() call and the
dds.disconnect() call when making the change.

In `@python/packages/jumpstarter-driver-gptp/examples/exporter.yaml`:
- Around line 4-13: The top-level key "drivers:" is deprecated and should be
renamed to "export:" to match the ExporterConfig schema; update the YAML so the
block beginning with "drivers:" is replaced with "export:" (keeping the nested
"gptp" entry, type jumpstarter_driver_gptp.driver.Gptp, and the existing config
fields like interface, domain, profile, transport, role, and sync_system_clock
unchanged) so the example validates against ExporterConfig.

In `@python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/client.py`:
- Around line 113-129: The wait_for_sync loop currently swallows all exceptions;
update it to catch only expected transient errors (e.g., ConnectionError,
TimeoutError, OSError) when calling self.is_synchronized(), log the caught
exception at debug or warning level (use self.logger if available or the logging
module), and re-raise any other unexpected exceptions so critical failures
aren't hidden; specifically replace the bare `except Exception: pass` in
wait_for_sync with a narrowed `except (ConnectionError, TimeoutError, OSError)
as e:` that logs and continues, and an `except Exception:` that re-raises.

In `@python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/conftest.py`:
- Around line 53-58: The override of stop() (MockGptpBackend.stop) clears
several states but does not reset the base backend's _offset_ns back to 0.0,
which changes post-stop semantics; fix by preserving the base behavior — either
call super().stop() at the start or explicitly set self._offset_ns = 0.0 in
MockGptpBackend.stop() (and keep the existing resets of _started, _port_state,
_servo_state and _call_log) so the last synced offset is not leaked into later
assertions.

In
`@python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver_test.py`:
- Around line 611-626: The test test_gptp_real_sync_software_timestamping
instantiates Gptp(interface=slave_iface, ...) in the default namespace while the
slave interface was moved into ns-ptp-slave; update the test so the driver and
its ptp4l child run inside the slave namespace by either: invoking serve(driver)
under the namespace (e.g., run the serve/Gptp process with ip netns exec
ns-ptp-slave) or by using the same pattern as the ptp_master fixture to wrap the
driver startup in the slave namespace; ensure you reference the existing
veth_pair variables (slave_ns, slave_iface), the Gptp constructor, and the
serve(client) call so the spawned ptp4l can find slave_iface.

In `@python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py`:
- Around line 249-251: Replace the call to
asyncio.get_event_loop().create_task(...) with the idiomatic
asyncio.create_task(...) when scheduling the reader coroutine: change the
creation of self._reader_task (currently assigned via
asyncio.get_event_loop().create_task(self._read_ptp4l_output())) to use
asyncio.create_task(self._read_ptp4l_output()) so the _read_ptp4l_output
coroutine is scheduled directly and more idiomatically.

In `@python/packages/jumpstarter-driver-gptp/pyproject.toml`:
- Around line 25-27: The package metadata's source_archive URL is using the
wrong repository slug; update the [tool.hatch.metadata.hooks.vcs.urls] entry
named source_archive to point to the correct GitHub repo for the gPTP package
(replace "jumpstarter-dev/repo" with the actual repository slug for this
project) so the published metadata links to a valid archive; edit the
source_archive value in pyproject.toml to use the correct repo path and keep the
{commit_hash}.zip placeholder.

---

Nitpick comments:
In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py`:
- Around line 396-412: monitor currently accesses backend internals
(_require_connected and _topics) directly; add a public backend method
has_topic(name: str) -> bool that calls the backend's internal
_require_connected() and returns name in self._topics, then update monitor to
call backend.has_topic(topic_name) instead of inspecting _topics and remove the
direct call to _require_connected; keep using the existing public
backend.read(...) to fetch samples. Ensure the new has_topic is part of the
backend API so monitor only uses public methods (has_topic and read).
- Around line 52-65: The function _make_idl_type currently forces every field to
be a str with default "", which prevents non-string schemas; change the API to
accept fields as either a list of names or a list of (name, type, [default])
tuples and build dc_fields accordingly in _make_idl_type: preserve the existing
behavior when items are plain strings (map to str, default ""), but when an item
is a tuple use the provided type and default (or omit default if not provided)
so dataclass fields have correct type annotations; update callers that pass
fields to supply types where needed and add or update a short docstring noting
the supported field formats and backward compatibility with the string-only
form.
- Around line 325-331: The close() method currently breaks encapsulation by
reading _backend._connected directly; add a public read-only property
is_connected to the backend implementations (e.g., DdsBackend and
MockDdsBackend) that returns their internal _connected flag, then update
Dds.close (the close method shown) to call self._backend.is_connected instead of
accessing _backend._connected and retain the existing try/except around
self._backend.disconnect() and the subsequent super().close() call.

In `@python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/client.py`:
- Around line 177-188: The CLI inner function named monitor shadows the class
method self.monitor(), which is confusing; rename the CLI callback (e.g.,
monitor_cmd) and update the decorator reference (`@base.command`() bound function
name) and any uses so the CLI handler is distinct from the class method, keeping
the call to self.monitor() unchanged inside the new monitor_cmd function; ensure
click.option remains attached to the renamed function and update any tests or
references to the old monitor CLI name if present.

In `@python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py`:
- Around line 216-220: The temporary config file handle self._config_file is
left open after writing; change the creation to write and flush then close the
NamedTemporaryFile and store its path in self._config_file_path (e.g.,
self._config_file_path = self._config_file.name) and remove/close
self._config_file to release the descriptor, then update any code that
referenced self._config_file (notably the code paths in start() and the ptp4l
invocation around the earlier lines) to use self._config_file_path when passing
the filename to ptp4l or other readers.
- Around line 360-382: The stub methods get_clock_identity and get_parent_info
currently return empty/default values; replace those placeholders by raising
NotImplementedError (with a clear message like "get_clock_identity not
implemented" / "get_parent_info not implemented") after the existing
self._require_started() call and add a TODO comment indicating the intended
implementation (e.g., parse ptp4l output or query PTP management interface) so
callers and CI know these are intentionally unimplemented; keep the existing
decorators (`@export`, `@validate_call`) and method signatures intact.
- Line 419: The hardcoded magic number 100 used in the streaming loops of
Gptp.read and MockGptp.read should be replaced with a named, configurable limit:
introduce a constant or configurable attribute (e.g., STREAM_ITER_LIMIT or
self.max_stream_iterations) and use it instead of the literal 100 in both
Gptp.read and MockGptp.read; expose the value via constructor args or config so
callers/tests can override, update any places that rely on the default, and add
a brief docstring or comment explaining the purpose of the limit.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 1bd84cc6-4bca-4158-9c59-7c50d844f7bf

📥 Commits

Reviewing files that changed from the base of the PR and between 03fc412 and a1302eb.

⛔ Files ignored due to path filters (1)
  • python/uv.lock is excluded by !**/*.lock
📒 Files selected for processing (24)
  • python/docs/source/reference/package-apis/drivers/dds.md
  • python/docs/source/reference/package-apis/drivers/gptp.md
  • python/docs/source/reference/package-apis/drivers/index.md
  • python/packages/jumpstarter-driver-dds/.gitignore
  • python/packages/jumpstarter-driver-dds/README.md
  • python/packages/jumpstarter-driver-dds/examples/exporter.yaml
  • python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/__init__.py
  • python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/client.py
  • python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/common.py
  • python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/conftest.py
  • python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py
  • python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver_test.py
  • python/packages/jumpstarter-driver-dds/pyproject.toml
  • python/packages/jumpstarter-driver-gptp/.gitignore
  • python/packages/jumpstarter-driver-gptp/README.md
  • python/packages/jumpstarter-driver-gptp/examples/exporter.yaml
  • python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/__init__.py
  • python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/client.py
  • python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/common.py
  • python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/conftest.py
  • python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py
  • python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver_test.py
  • python/packages/jumpstarter-driver-gptp/pyproject.toml
  • python/pyproject.toml

Comment on lines +11 to +14
dependencies = [
"jumpstarter",
"cyclonedds>=0.10.0",
]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# First, find the DDS driver package location
fd -t f "pyproject.toml" | grep "jumpstarter-driver-dds"

Repository: jumpstarter-dev/jumpstarter

Length of output: 124


🏁 Script executed:

#!/bin/bash
# Read the client.py file to check if click is imported
if [ -f "python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/client.py" ]; then
  cat -n "python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/client.py"
fi

Repository: jumpstarter-dev/jumpstarter

Length of output: 5990


🏁 Script executed:

#!/bin/bash
# Read the full pyproject.toml for DDS driver
if [ -f "python/packages/jumpstarter-driver-dds/pyproject.toml" ]; then
  cat -n "python/packages/jumpstarter-driver-dds/pyproject.toml"
fi

Repository: jumpstarter-dev/jumpstarter

Length of output: 1499


🏁 Script executed:

#!/bin/bash
# Check if click is available from transitive dependencies (check jumpstarter package)
if [ -f "python/packages/jumpstarter/pyproject.toml" ]; then
  echo "=== jumpstarter package dependencies ==="
  cat -n "python/packages/jumpstarter/pyproject.toml" | grep -A 20 "dependencies"
fi

Repository: jumpstarter-dev/jumpstarter

Length of output: 754


🏁 Script executed:

#!/bin/bash
# Check pyserial driver for reference (mentioned in learnings as using transitive deps)
if [ -f "python/packages/jumpstarter-driver-pyserial/pyproject.toml" ]; then
  echo "=== pyserial package dependencies ==="
  cat -n "python/packages/jumpstarter-driver-pyserial/pyproject.toml" | grep -A 20 "dependencies"
fi

Repository: jumpstarter-dev/jumpstarter

Length of output: 711


Declare click as a direct runtime dependency.

python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/client.py imports click directly (line 7) and uses it extensively in command decorators and output calls, but this package doesn't declare it in dependencies. The base jumpstarter package also doesn't provide it transitively. This violates the requirement that packages declare their own minimal, focused dependencies.

📦 Proposed manifest fix
 dependencies = [
     "jumpstarter",
+    "click",
     "cyclonedds>=0.10.0",
 ]
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
dependencies = [
"jumpstarter",
"cyclonedds>=0.10.0",
]
dependencies = [
"jumpstarter",
"click",
"cyclonedds>=0.10.0",
]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@python/packages/jumpstarter-driver-dds/pyproject.toml` around lines 11 - 14,
The package manifest is missing the direct runtime dependency on click even
though jumpstarter_driver_dds/client.py imports and uses click (e.g., command
decorators and output calls); update
python/packages/jumpstarter-driver-dds/pyproject.toml by adding click (e.g.,
"click>=8.0" or your project's minimum supported click) to the dependencies list
so the package declares its own required runtime dependency.

Comment on lines +113 to +129
def wait_for_sync(self, timeout: float = 30.0, poll_interval: float = 1.0) -> bool:
"""Block until PTP synchronization is achieved or timeout expires.

:param timeout: Maximum time to wait in seconds
:param poll_interval: Polling interval in seconds
:returns: True if synchronized, False if timeout expired
:rtype: bool
"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
try:
if self.is_synchronized():
return True
except Exception:
pass
time.sleep(poll_interval)
return False
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Overly broad exception swallowing in wait_for_sync may hide critical failures.

The bare except Exception: pass silently ignores all errors during polling, including connection failures, authentication issues, or driver crashes. Consider catching only expected transient errors:

🛡️ Proposed fix to narrow exception handling
 def wait_for_sync(self, timeout: float = 30.0, poll_interval: float = 1.0) -> bool:
     """Block until PTP synchronization is achieved or timeout expires.

     :param timeout: Maximum time to wait in seconds
     :param poll_interval: Polling interval in seconds
     :returns: True if synchronized, False if timeout expired
     :rtype: bool
     """
+    from jumpstarter.client.core import DriverError
+
     deadline = time.monotonic() + timeout
     while time.monotonic() < deadline:
         try:
             if self.is_synchronized():
                 return True
-        except Exception:
-            pass
+        except DriverError:
+            # Expected during startup/transient states
+            pass
         time.sleep(poll_interval)
     return False
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def wait_for_sync(self, timeout: float = 30.0, poll_interval: float = 1.0) -> bool:
"""Block until PTP synchronization is achieved or timeout expires.
:param timeout: Maximum time to wait in seconds
:param poll_interval: Polling interval in seconds
:returns: True if synchronized, False if timeout expired
:rtype: bool
"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
try:
if self.is_synchronized():
return True
except Exception:
pass
time.sleep(poll_interval)
return False
def wait_for_sync(self, timeout: float = 30.0, poll_interval: float = 1.0) -> bool:
"""Block until PTP synchronization is achieved or timeout expires.
:param timeout: Maximum time to wait in seconds
:param poll_interval: Polling interval in seconds
:returns: True if synchronized, False if timeout expired
:rtype: bool
"""
from jumpstarter.client.core import DriverError
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
try:
if self.is_synchronized():
return True
except DriverError:
# Expected during startup/transient states
pass
time.sleep(poll_interval)
return False
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/client.py`
around lines 113 - 129, The wait_for_sync loop currently swallows all
exceptions; update it to catch only expected transient errors (e.g.,
ConnectionError, TimeoutError, OSError) when calling self.is_synchronized(), log
the caught exception at debug or warning level (use self.logger if available or
the logging module), and re-raise any other unexpected exceptions so critical
failures aren't hidden; specifically replace the bare `except Exception: pass`
in wait_for_sync with a narrowed `except (ConnectionError, TimeoutError,
OSError) as e:` that logs and continues, and an `except Exception:` that
re-raises.

Comment on lines +53 to +58
def stop(self):
self.require_started()
self._started = False
self._port_state = "INITIALIZING"
self._servo_state = "s0"
self._call_log.append("stop")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Preserve the base backend's stop-state resets.

MockGptpBackend.stop() clears _offset_ns back to 0.0, but this override leaves the last synced offset behind. That changes the post-stop semantics of the drop-in backend and can leak stale state into later assertions.

♻️ Proposed fix
     def stop(self):
-        self.require_started()
-        self._started = False
-        self._port_state = "INITIALIZING"
-        self._servo_state = "s0"
+        super().stop()
         self._call_log.append("stop")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def stop(self):
self.require_started()
self._started = False
self._port_state = "INITIALIZING"
self._servo_state = "s0"
self._call_log.append("stop")
def stop(self):
super().stop()
self._call_log.append("stop")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/conftest.py`
around lines 53 - 58, The override of stop() (MockGptpBackend.stop) clears
several states but does not reset the base backend's _offset_ns back to 0.0,
which changes post-stop semantics; fix by preserving the base behavior — either
call super().stop() at the start or explicitly set self._offset_ns = 0.0 in
MockGptpBackend.stop() (and keep the existing resets of _started, _port_state,
_servo_state and _call_log) so the last synced offset is not leaked into later
assertions.

Comment on lines +611 to +626
def test_gptp_real_sync_software_timestamping(self, veth_pair, ptp_master):
import time
_, _, slave_ns, slave_iface = veth_pair
driver = Gptp(
interface=slave_iface, domain=0, profile="default",
transport="UDPv4", role="slave", sync_system_clock=False,
)
with serve(driver) as client:
client.start()
time.sleep(10)
status = client.status()
assert status.port_state == PortState.SLAVE
offset = client.get_offset()
assert abs(offset.offset_from_master_ns) < 10_000_000
assert client.is_synchronized() is True
client.stop()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify whether the test expects to run inside the namespace or if there's additional setup

# Check if there's any namespace execution wrapper in conftest or the driver
rg -n "netns exec|nsenter|setns" python/packages/jumpstarter-driver-gptp/

Repository: jumpstarter-dev/jumpstarter

Length of output: 688


🏁 Script executed:

# Get the full test function and surrounding context
sed -n '570,635p' python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver_test.py

Repository: jumpstarter-dev/jumpstarter

Length of output: 2641


🏁 Script executed:

# Find the veth_pair fixture definition
rg -n "def veth_pair|@pytest.fixture.*veth_pair" python/packages/jumpstarter-driver-gptp/ -A 20

Repository: jumpstarter-dev/jumpstarter

Length of output: 2760


🏁 Script executed:

# Check the serve function implementation
rg -n "def serve" python/packages/jumpstarter-driver-gptp/ -A 15

Repository: jumpstarter-dev/jumpstarter

Length of output: 53


🏁 Script executed:

# Search for serve function definition - it should be in the same test file
rg -n "def serve" python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver_test.py -A 10

Repository: jumpstarter-dev/jumpstarter

Length of output: 53


🏁 Script executed:

# Check the Gptp class to see if it supports namespace handling
rg -n "class Gptp|def __init__" python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py -A 20 | head -60

Repository: jumpstarter-dev/jumpstarter

Length of output: 2263


🏁 Script executed:

# Check if there's a namespace context manager or parameter in Gptp
rg -n "netns|namespace" python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py

Repository: jumpstarter-dev/jumpstarter

Length of output: 53


🏁 Script executed:

# Search for serve function in conftest or test files
find python/packages/jumpstarter-driver-gptp -name "conftest.py" -o -name "*.py" | xargs grep -l "def serve" 2>/dev/null

Repository: jumpstarter-dev/jumpstarter

Length of output: 55


🏁 Script executed:

# Check imports at the top of driver_test.py
head -50 python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver_test.py

Repository: jumpstarter-dev/jumpstarter

Length of output: 1743


🏁 Script executed:

# Search for serve in the entire jumpstarter-driver-gptp package
rg -n "from .* import serve|def serve" python/packages/jumpstarter-driver-gptp/

Repository: jumpstarter-dev/jumpstarter

Length of output: 413


🏁 Script executed:

# Find and examine the serve function
find python -name "utils.py" -path "*/common/*" | xargs grep -l "def serve" 2>/dev/null

Repository: jumpstarter-dev/jumpstarter

Length of output: 126


🏁 Script executed:

# Get the serve function implementation
find python -name "utils.py" -path "*/common/*" -exec grep -A 30 "def serve" {} + | head -50

Repository: jumpstarter-dev/jumpstarter

Length of output: 1811


🏁 Script executed:

# Check Gptp.start() implementation to see how it accesses the interface
rg -n "async def start|def start" python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py -A 20 | head -80

Repository: jumpstarter-dev/jumpstarter

Length of output: 2466


🏁 Script executed:

# Check if Gptp spawns subprocess for ptp4l and how it accesses the interface
rg -n "ptp4l|asyncio.subprocess" python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py -B 2 -A 5 | head -100

Repository: jumpstarter-dev/jumpstarter

Length of output: 3606


🏁 Script executed:

# Confirm that interfaces moved to namespace are not accessible from default namespace
# by checking Linux network namespace behavior
cat << 'EOF' > /tmp/test_netns.py
# When an interface is moved to a namespace with:
# ip link set veth-s netns ns-ptp-slave
# That interface is NO LONGER visible in the default/host namespace
# ptp4l spawned in the host namespace will fail to find that interface

# This is fundamental Linux network namespace isolation behavior
print("Linux namespace isolation confirmed:")
print("1. veth-s moved to ns-ptp-slave via: ip link set veth-s netns ns-ptp-slave")
print("2. ptp4l spawned in default namespace CANNOT access veth-s")
print("3. ptp4l will fail with 'Cannot find interface' or similar error")
EOF
python /tmp/test_netns.py

Repository: jumpstarter-dev/jumpstarter

Length of output: 306


🏁 Script executed:

# Check if there are any other tests in the file that properly handle namespace execution
rg -n "ip netns exec|nsenter" python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver_test.py

Repository: jumpstarter-dev/jumpstarter

Length of output: 372


Driver must execute inside the slave network namespace.

The test moves veth-s into ns-ptp-slave (line 584: ip link set veth-s netns ns-ptp-slave), making it invisible in the default namespace. However, Gptp(interface=slave_iface, ...) is instantiated without namespace context, causing the spawned ptp4l process to fail finding the interface.

Compare with ptp_master fixture (lines 597-607), which correctly wraps ptp4l with ip netns exec ns-ptp-master. The test should wrap the driver invocation similarly using ip netns exec ns-ptp-slave or pass the namespace to serve() and Gptp.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver_test.py`
around lines 611 - 626, The test test_gptp_real_sync_software_timestamping
instantiates Gptp(interface=slave_iface, ...) in the default namespace while the
slave interface was moved into ns-ptp-slave; update the test so the driver and
its ptp4l child run inside the slave namespace by either: invoking serve(driver)
under the namespace (e.g., run the serve/Gptp process with ip netns exec
ns-ptp-slave) or by using the same pattern as the ptp_master fixture to wrap the
driver startup in the slave namespace; ensure you reference the existing
veth_pair variables (slave_ns, slave_iface), the Gptp constructor, and the
serve(client) call so the spawned ptp4l can find slave_iface.

Comment on lines +249 to +251
self._reader_task = asyncio.get_event_loop().create_task(
self._read_ptp4l_output()
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🌐 Web query:

Is asyncio.get_event_loop() deprecated in Python 3.10+ when called from a coroutine?

💡 Result:

No, asyncio.get_event_loop is not deprecated when called from a coroutine in Python 3.10+. When called from a coroutine (or callback), it returns the running event loop without emitting a DeprecationWarning. The deprecation applies specifically to cases where there is no running event loop and no current loop set in the policy (typically synchronous code outside a running loop). The official documentation explicitly states: "When called from a coroutine or a callback (e.g. scheduled with call_soon or similar API), this function will always return the running event loop." The DeprecationWarning is emitted only if there is no running event loop and the policy's get_event_loop is called to create or retrieve one, which does not happen in a coroutine context where a loop is running. Instead, use asyncio.get_running_loop in coroutines as the preferred method, though get_event_loop continues to work without warning in that context.

Citations:


🏁 Script executed:

# Check the file to understand the context better
cd python/packages/jumpstarter-driver-gptp && head -n 260 jumpstarter_driver_gptp/driver.py | tail -n 20

Repository: jumpstarter-dev/jumpstarter

Length of output: 734


Use asyncio.create_task() directly instead of asyncio.get_event_loop().create_task().

In async methods, you can create tasks directly without explicitly accessing the event loop. This is the idiomatic approach and avoids unnecessary intermediate calls.

🐛 Proposed fix
-        self._reader_task = asyncio.get_event_loop().create_task(
+        self._reader_task = asyncio.create_task(
             self._read_ptp4l_output()
         )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@python/packages/jumpstarter-driver-gptp/jumpstarter_driver_gptp/driver.py`
around lines 249 - 251, Replace the call to
asyncio.get_event_loop().create_task(...) with the idiomatic
asyncio.create_task(...) when scheduling the reader coroutine: change the
creation of self._reader_task (currently assigned via
asyncio.get_event_loop().create_task(self._read_ptp4l_output())) to use
asyncio.create_task(self._read_ptp4l_output()) so the _read_ptp4l_output
coroutine is scheduled directly and more idiomatically.

Comment on lines +25 to +27
[tool.hatch.metadata.hooks.vcs.urls]
Homepage = "https://jumpstarter.dev"
source_archive = "https://github.com/jumpstarter-dev/repo/archive/{commit_hash}.zip"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix the source_archive repository slug here too.

This points at jumpstarter-dev/repo, so the published metadata will link to a non-existent archive for the gPTP package as well.

🔗 Proposed metadata fix
-source_archive = "https://github.com/jumpstarter-dev/repo/archive/{commit_hash}.zip"
+source_archive = "https://github.com/jumpstarter-dev/jumpstarter/archive/{commit_hash}.zip"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
[tool.hatch.metadata.hooks.vcs.urls]
Homepage = "https://jumpstarter.dev"
source_archive = "https://github.com/jumpstarter-dev/repo/archive/{commit_hash}.zip"
[tool.hatch.metadata.hooks.vcs.urls]
Homepage = "https://jumpstarter.dev"
source_archive = "https://github.com/jumpstarter-dev/jumpstarter/archive/{commit_hash}.zip"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@python/packages/jumpstarter-driver-gptp/pyproject.toml` around lines 25 - 27,
The package metadata's source_archive URL is using the wrong repository slug;
update the [tool.hatch.metadata.hooks.vcs.urls] entry named source_archive to
point to the correct GitHub repo for the gPTP package (replace
"jumpstarter-dev/repo" with the actual repository slug for this project) so the
published metadata links to a valid archive; edit the source_archive value in
pyproject.toml to use the correct repo path and keep the {commit_hash}.zip
placeholder.

Add jumpstarter-driver-dds package providing DDS (Data Distribution Service)
pub/sub messaging via Eclipse CycloneDDS. Includes domain participation,
topic management with configurable QoS, publish/subscribe, streaming
monitor, and a CLI interface.

Comprehensive test suite with 58 tests covering:
- Pydantic model validation
- MockDdsBackend unit tests
- Full gRPC e2e tests via MockDds driver
- Error path coverage
- Stateful lifecycle tests (connection, topic, pub/sub, reconnect)
- Call log/audit trail tests
- End-to-end workflow scenarios

Made-with: Cursor
@vtz vtz force-pushed the feat/dds-driver branch from a1302eb to 4799c64 Compare March 29, 2026 11:59
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/conftest.py (1)

40-53: Consider whether _call_log should be cleared on disconnect.

disconnect() resets _total_published and _total_read but preserves _call_log. If the intent is to reset all session state, this is inconsistent; if the intent is to maintain a persistent audit trail across reconnects, this is correct but should be documented.

Looking at test_stateful_call_log_records_operations, the test expects the full log including disconnect, so the current behavior appears intentional for audit purposes.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/conftest.py`
around lines 40 - 53, The disconnect() method resets session counters but leaves
_call_log intact which is intentional per
test_stateful_call_log_records_operations; add a brief clarifying comment inside
disconnect (near the _call_log append) stating that _call_log is preserved
across disconnect/reconnect to serve as a persistent audit trail so future
readers understand this is deliberate (reference disconnect, _call_log, and
test_stateful_call_log_records_operations).
python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver_test.py (1)

371-379: CLI test provides minimal smoke coverage.

The test verifies the CLI interface exists and has expected commands, but only checks topics and info. Consider expanding to verify all expected CLI commands (connect, disconnect, read, monitor) are present.

🔧 Suggested expansion
     def test_cli_interface(self):
         with serve(MockDds()) as client:
             cli = client.cli()
             assert hasattr(cli, "commands")
-            assert "topics" in cli.commands
-            assert "info" in cli.commands
+            # Verify all expected DDS CLI commands are registered
+            expected = {"connect", "disconnect", "topics", "info", "read", "monitor"}
+            assert expected.issubset(set(cli.commands.keys()))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver_test.py`
around lines 371 - 379, The test TestClientCli.test_cli_interface currently only
asserts that cli.commands contains "topics" and "info"; expand it to also assert
presence of the other expected CLI commands by adding assertions that "connect",
"disconnect", "read", and "monitor" are keys in cli.commands (leave the existing
checks for "topics" and "info" intact), locating the CLI via serve(MockDds())
and the cli variable used in this test.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/conftest.py`:
- Around line 40-53: The disconnect() method resets session counters but leaves
_call_log intact which is intentional per
test_stateful_call_log_records_operations; add a brief clarifying comment inside
disconnect (near the _call_log append) stating that _call_log is preserved
across disconnect/reconnect to serve as a persistent audit trail so future
readers understand this is deliberate (reference disconnect, _call_log, and
test_stateful_call_log_records_operations).

In
`@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver_test.py`:
- Around line 371-379: The test TestClientCli.test_cli_interface currently only
asserts that cli.commands contains "topics" and "info"; expand it to also assert
presence of the other expected CLI commands by adding assertions that "connect",
"disconnect", "read", and "monitor" are keys in cli.commands (leave the existing
checks for "topics" and "info" intact), locating the CLI via serve(MockDds())
and the cli variable used in this test.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: fb37f15a-e50c-44c6-a488-6d7775a082bc

📥 Commits

Reviewing files that changed from the base of the PR and between a1302eb and 4799c64.

⛔ Files ignored due to path filters (1)
  • python/uv.lock is excluded by !**/*.lock
📒 Files selected for processing (13)
  • python/docs/source/reference/package-apis/drivers/dds.md
  • python/docs/source/reference/package-apis/drivers/index.md
  • python/packages/jumpstarter-driver-dds/.gitignore
  • python/packages/jumpstarter-driver-dds/README.md
  • python/packages/jumpstarter-driver-dds/examples/exporter.yaml
  • python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/__init__.py
  • python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/client.py
  • python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/common.py
  • python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/conftest.py
  • python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py
  • python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver_test.py
  • python/packages/jumpstarter-driver-dds/pyproject.toml
  • python/pyproject.toml
✅ Files skipped from review due to trivial changes (7)
  • python/docs/source/reference/package-apis/drivers/dds.md
  • python/packages/jumpstarter-driver-dds/.gitignore
  • python/docs/source/reference/package-apis/drivers/index.md
  • python/packages/jumpstarter-driver-dds/examples/exporter.yaml
  • python/packages/jumpstarter-driver-dds/pyproject.toml
  • python/packages/jumpstarter-driver-dds/README.md
  • python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/common.py
🚧 Files skipped from review as they are similar to previous changes (3)
  • python/pyproject.toml
  • python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/client.py
  • python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py

- Raise TypeError on malformed list_topics() response instead of silently returning []
- Add explicit name= to CLI commands to avoid _cmd suffixes
- Add is_connected property and has_topic() to backends for encapsulation
- Use public backend API in monitor() and close() instead of accessing internals
- Validate missing required fields in StatefulDdsBackend.publish()
- Document string-only field limitation in _make_idl_type
- Document _call_log persistence across disconnect in StatefulDdsBackend
- Expand CLI test to verify all registered commands
- Add comprehensive docstrings across all modules

Made-with: Cursor
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py (1)

436-454: Consider making the monitor iteration limit configurable.

The monitor() method has a hardcoded limit of 100 iterations with 0.1s sleep (total ~10s), which may not suit all use cases. The client-side monitor_cmd already has a --count option, but the server-side generator will stop after 100 iterations regardless.

Consider adding a max_iterations parameter or using an infinite loop, letting the client control termination via the streaming protocol's cancellation.

Proposed approach
     `@export`
-    async def monitor(self, topic_name: str) -> AsyncGenerator[DdsSample, None]:
+    async def monitor(self, topic_name: str, max_iterations: int = 0) -> AsyncGenerator[DdsSample, None]:
         """Stream data samples from a topic as they arrive.

-        Polls the topic reader periodically and yields new samples.
+        Polls the topic reader periodically and yields new samples.
+        If max_iterations is 0, polls indefinitely until cancelled.
         """
         import asyncio

         if not self._backend.is_connected:
             raise RuntimeError("Not connected -- call connect() first")
         if not self._backend.has_topic(topic_name):
             raise ValueError(f"Topic '{topic_name}' not registered")

-        for _ in range(100):
+        iterations = 0
+        while max_iterations == 0 or iterations < max_iterations:
             result = self._backend.read(topic_name, max_samples=10)
             for sample in result.samples:
                 yield sample
             await asyncio.sleep(0.1)
+            iterations += 1
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py`
around lines 436 - 454, The monitor method currently stops after a hardcoded 100
iterations; change its signature (monitor) to accept an optional max_iterations:
Optional[int] = None (or max_iterations: int = 100) and use that parameter
instead of the literal 100 in the for loop, treating None as infinite looping
(e.g., while True) so the generator only stops when max_iterations is reached or
when the caller cancels the stream; ensure you keep existing checks
(_backend.is_connected, _backend.has_topic) and continue calling
_backend.read(topic_name, max_samples=10) with the same await asyncio.sleep(0.1)
behavior so existing consumers (and monitor_cmd) can control termination.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py`:
- Around line 436-454: The monitor method currently stops after a hardcoded 100
iterations; change its signature (monitor) to accept an optional max_iterations:
Optional[int] = None (or max_iterations: int = 100) and use that parameter
instead of the literal 100 in the for loop, treating None as infinite looping
(e.g., while True) so the generator only stops when max_iterations is reached or
when the caller cancels the stream; ensure you keep existing checks
(_backend.is_connected, _backend.has_topic) and continue calling
_backend.read(topic_name, max_samples=10) with the same await asyncio.sleep(0.1)
behavior so existing consumers (and monitor_cmd) can control termination.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 233df602-d076-4ad0-9ddd-15fd9e9cfe38

📥 Commits

Reviewing files that changed from the base of the PR and between 4799c64 and 36253b9.

📒 Files selected for processing (4)
  • python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/client.py
  • python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/conftest.py
  • python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py
  • python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver_test.py

Copy link
Copy Markdown
Contributor

@raballew raballew left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for the other PRs, replace asyncio with anyio as its the project convention. Also look at the comments below.

if key not in fields:
raise ValueError(f"Unknown field '{key}' for topic '{topic_name}'")

sample = DdsSample(topic_name=topic_name, data=data, timestamp=time.time())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line stores the partial data dict as-is, but the real DdsBackend.publish at line 178 does idl_type(**data) which fills in missing fields with empty strings (via the dataclass defaults at line 65). So if you publish {"x": "10"} to a topic with fields ["x", "y"]:

  • Real backend: a consumer reads {"x": "10", "y": ""}
  • Mock backend: a consumer reads {"x": "10"}

Also, unknown keys raise TypeError from the dataclass constructor in the real backend (line 178) but ValueError here (line 300). Code that catches ValueError will pass in tests but miss the TypeError in production.

And publishing {} succeeds silently in DdsBackend (all defaults), stores an empty dict here, and raises DdsTopicError in StatefulDdsBackend (conftest.py:84-88). Three backends, three behaviors.

The root cause is that there is no shared Protocol or ABC between backends (line 347 uses a union type). Consider filling defaults in mock publish to match real behavior, and normalizing exception types.

history_depth: int | None = None,
) -> DdsTopicInfo:
"""Create a topic on the mock backend."""
qos = DdsTopicQos()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This uses bare DdsTopicQos() (Pydantic defaults), but Dds.create_topic at line 403 uses self._default_qos() which reads from the driver's config fields (default_reliability, default_durability, default_history_depth at lines 342-344). MockDds (lines 467-468) has none of those config fields and no _default_qos() method.

So if someone configures Dds(default_reliability="BEST_EFFORT"), topics get BEST_EFFORT QoS. But MockDds always creates topics with RELIABLE QoS regardless of any config. Tests using MockDds with non-default QoS silently diverge from production.

Could you add the three config fields and a _default_qos() to MockDds, and call it here instead of DdsTopicQos()?



@dataclass(kw_only=True, config=ConfigDict(arbitrary_types_allowed=True))
class MockDds(Driver):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dds.close() (lines 363-370) explicitly calls self._backend.disconnect() before super().close(). MockDds has no close() override, so the internal backend's disconnect() is never called on teardown. Not a resource leak per se (no native resources), but it is a lifecycle asymmetry between the two driver classes. Adding a close() that mirrors the Dds pattern would be a small fix.


from cyclonedds.idl import IdlStruct

cls_name = topic_name.replace("/", "_").replace("-", "_").replace(".", "_") + "_Type"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sanitization maps distinct topic names to the same Python class name. For example, "sensor/temp" and "sensor-temp" both produce "sensor_temp_Type". The wire-level DDS topic identity is fine (CycloneDDS uses the raw name string at line 144), so this won't cause data corruption on the bus. But at the Python level you get confusing debugging and potential namespace issues.

Also, no validation that the result is a valid Python identifier. Something like "123-topic" would produce "123_topic_Type" which is not a valid class name.

A hash suffix or reversible encoding would prevent collisions, plus an str.isidentifier() check would catch invalid names early.

"""Tear down all DDS entities and release the participant."""
if not self._connected:
raise RuntimeError("Not connected to DDS domain")
self._writers.clear()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This clears the Python dicts and sets self._participant = None, but never calls .close() on the CycloneDDS DataWriter, DataReader, Topic, or DomainParticipant objects. CycloneDDS wraps C objects, and while it likely implements __del__ cleanup, relying on GC for native resource release is fragile (especially with reference cycles or interpreter shutdown).

Calling .close() on writers, readers, topics, and participant (in reverse creation order) before clearing the dicts would be the safe approach.

) -> DdsTopicInfo:
"""Create a DDS topic with the given schema and QoS."""
return DdsTopicInfo.model_validate(
self.call("create_topic", name, fields, reliability, durability, history_depth)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing name, fields, reliability, durability, history_depth as positional args to self.call() is fragile. If the server-side method signature changes parameter order, this silently breaks. Using keyword arguments would be more resilient across the gRPC remoting layer.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valid point, though a change here would mean a framework change as DriverClient.call() has signature call(method, *args) — positional-only is the project-wide convention used by all drivers. This would need to support kwargs.

DdsTopicInfo(
name=name,
fields=fields,
qos=self._qos_map.get(name, DdsTopicQos()),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both self._qos_map.get(name, DdsTopicQos()) here and self._sample_counts.get(name, 0) at line 166 can never hit their defaults because create_topic() always populates both dicts. Same pattern at line 180. The .get() fallbacks mask potential bugs: if a topic somehow ends up in _topics but not in _qos_map, you'd silently get wrong QoS instead of a KeyError that would surface the bug.


topic_name: str
samples: list[DdsSample] = []
sample_count: int = 0
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing prevents creating DdsReadResult(samples=[s1, s2], sample_count=0). A Pydantic model_validator enforcing sample_count == len(samples) (or just making sample_count computed) would catch structural inconsistencies early.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Added a Pydantic model_validator that enforces sample_count == len(samples).

if self._backend.is_connected:
try:
self._backend.disconnect()
except Exception:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This catches all exceptions during disconnect, including things like AttributeError or TypeError that would indicate real bugs rather than expected cleanup failures. A narrower catch (e.g., RuntimeError or the specific CycloneDDS exception types) would let actual programming errors propagate.


@base.command(name="monitor")
@click.argument("topic_name")
@click.option("--count", "-n", default=10, help="Number of events")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This option suggests the user can control how many events to receive, but the server-side monitor() (driver.py:449) always exits after 100 iterations regardless. If someone passes --count 500, they will get at most 100 iterations worth of samples with no indication that the server stopped early. Worth either documenting the ceiling or making the server-side limit configurable.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Server now polls indefinitely (no 100-iteration cap), so --count works as advertised.

vtz added 2 commits April 1, 2026 17:04
- Replace asyncio with anyio (project convention)
- Normalize publish behavior: mock backend now fills missing fields with
  "" defaults and raises ValueError for unknown fields, matching the
  real CycloneDDS dataclass behavior
- Add QoS config fields and _default_qos() to MockDds for parity with Dds
- Add close() override to MockDds to mirror Dds lifecycle
- Explicitly close CycloneDDS writers/readers/topics/participant in
  disconnect() instead of relying on GC
- Make monitor() max_iterations configurable (0 = unlimited), add
  mid-loop disconnect handling for graceful exit
- Add create-topic and publish CLI commands
- Pin cyclonedds>=0.10.0,<1.0 to prevent native compat breakage
- Fix _make_idl_type name collisions with hash suffix and isidentifier check
- Add Dds(use_mock=True) test coverage
- Use dataclasses.fields() instead of __dataclass_fields__
- Remove always-True success field from DdsPublishResult
- Add DdsReadResult model_validator for sample_count consistency
- Remove .get() fallbacks on always-populated dicts
- Narrow close() exception catch to RuntimeError
- Document read/monitor mutual exclusivity on same topic
- Fix source_archive URL in pyproject.toml
- Fix README monitor example to show bounded usage
- Align StatefulDdsBackend enforcement with real/mock backends
Add parameterized backend parity tests (mock + stateful), driver
interface parity checks, CLI completeness assertions, Pydantic model
invariant guards, _make_idl_type collision tests, and an asyncio
convention enforcer to prevent recurrence of the issues identified
during code review.
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (1)
python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py (1)

118-137: ⚠️ Potential issue | 🟠 Major

Don't hide teardown bugs behind except Exception: pass.

These handlers suppress every cleanup failure and still mark the backend disconnected, so an AttributeError or TypeError in shutdown looks the same as a clean teardown. Catch only the expected DDS shutdown exceptions and log them; let unexpected bugs surface.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py`
around lines 118 - 137, The teardown currently swallows all errors by using bare
"except Exception: pass" for writer.close(), reader.close(), topic.close(), and
participant.close(), so change each broad except to explicitly handle known DDS
shutdown exceptions (e.g., the DDS library's specific exception class) and log
them, while letting unexpected exceptions propagate; specifically, in the loops
over self._writers, self._readers, self._topics and the self._participant close
block, replace "except Exception: pass" with "except <DDSShutdownException> as
e: <logger>.error(..., exc_info=e)" (or logging.exception) and add a final
"except Exception: raise" to surface programming errors. Ensure you use the
existing logger (e.g., self._logger) or the logging module and reference the
same close() calls and the attributes _writers, _readers, _topics, and
_participant so the behavior is consistent.
🧹 Nitpick comments (1)
python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/regression_test.py (1)

1-224: Consider folding this suite into driver_test.py.

Keeping driver tests split across driver_test.py and regression_test.py makes the canonical package test entrypoint less obvious than the rest of the driver packages. Based on learnings, "Each driver package must include comprehensive tests in driver_test.py."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/regression_test.py`
around lines 1 - 224, The regression_test.py suite should be folded into
driver_test.py to centralize driver tests; move the tests (classes
TestBackendParity, TestDriverInterfaceParity, TestCliCompleteness,
TestModelInvariants, TestIdlTypeCollisions, TestProjectConventions and their
test_* methods) into driver_test.py, update imports from .driver and .common
inside driver_test.py (e.g., Dds, MockDds, _make_idl_type, DdsReadResult,
DdsSample, DdsPublishResult, DdsTopicQos, serve), remove or delete
regression_test.py, and run pytest to ensure discovery and that no duplicate
test names or import conflicts remain (adjust any relative imports or fixtures
like any_backend as needed).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/common.py`:
- Around line 25-30: DdsTopicQos currently allows zero/negative history_depth;
enforce history_depth >= 1 by adding validation to the DdsTopicQos model: either
change the field declaration to use pydantic Field with ge=1 and default 10
(e.g., history_depth: int = Field(10, ge=1)) or add a pydantic `@validator` for
"history_depth" that raises a ValueError if value < 1, so invalid values are
rejected before reaching the mock or real DDS backends.

In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py`:
- Around line 167-177: The code currently assigns to internal maps
(self._idl_types, self._topics, self._qos_map, self._sample_counts) before
constructing DDS objects, so if DataWriter or DataReader construction fails you
end up with a half-registered topic; change create_topic() to first build local
variables (idl_type via _make_idl_type, cqos via _build_cyclonedds_qos, create
Topic, then construct DataWriter and DataReader) and only after all
constructions succeed atomically assign those locals into self._idl_types,
self._topics, self._qos_map, self._sample_counts, self._writers and
self._readers (or alternatively catch exceptions and roll back any partial
assignments) so subsequent create_topic() calls won’t see a partially-registered
topic.

In
`@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/regression_test.py`:
- Around line 101-120: The test_export_methods_match helper is incorrectly
detecting exported methods by checking for __wrapped__ (which misses monitor()
exported without `@validate_call`); update the helper used in
test_export_methods_match (the inner _exported and the dds_methods/mock_methods
sets) to use the official export marker constants from
jumpstarter.driver.decorators (instead of __wrapped__) so it detects methods
annotated with `@export` regardless of `@validate_call`; reference Dds, MockDds,
monitor(), and the export marker constants from jumpstarter.driver.decorators
when changing the membership checks.

---

Duplicate comments:
In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py`:
- Around line 118-137: The teardown currently swallows all errors by using bare
"except Exception: pass" for writer.close(), reader.close(), topic.close(), and
participant.close(), so change each broad except to explicitly handle known DDS
shutdown exceptions (e.g., the DDS library's specific exception class) and log
them, while letting unexpected exceptions propagate; specifically, in the loops
over self._writers, self._readers, self._topics and the self._participant close
block, replace "except Exception: pass" with "except <DDSShutdownException> as
e: <logger>.error(..., exc_info=e)" (or logging.exception) and add a final
"except Exception: raise" to surface programming errors. Ensure you use the
existing logger (e.g., self._logger) or the logging module and reference the
same close() calls and the attributes _writers, _readers, _topics, and
_participant so the behavior is consistent.

---

Nitpick comments:
In
`@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/regression_test.py`:
- Around line 1-224: The regression_test.py suite should be folded into
driver_test.py to centralize driver tests; move the tests (classes
TestBackendParity, TestDriverInterfaceParity, TestCliCompleteness,
TestModelInvariants, TestIdlTypeCollisions, TestProjectConventions and their
test_* methods) into driver_test.py, update imports from .driver and .common
inside driver_test.py (e.g., Dds, MockDds, _make_idl_type, DdsReadResult,
DdsSample, DdsPublishResult, DdsTopicQos, serve), remove or delete
regression_test.py, and run pytest to ensure discovery and that no duplicate
test names or import conflicts remain (adjust any relative imports or fixtures
like any_backend as needed).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 211b6b58-18ac-4144-8346-c7279b4c1f26

📥 Commits

Reviewing files that changed from the base of the PR and between 36253b9 and 81a864b.

⛔ Files ignored due to path filters (1)
  • python/uv.lock is excluded by !**/*.lock
📒 Files selected for processing (8)
  • python/packages/jumpstarter-driver-dds/README.md
  • python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/client.py
  • python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/common.py
  • python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/conftest.py
  • python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py
  • python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver_test.py
  • python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/regression_test.py
  • python/packages/jumpstarter-driver-dds/pyproject.toml
✅ Files skipped from review due to trivial changes (3)
  • python/packages/jumpstarter-driver-dds/pyproject.toml
  • python/packages/jumpstarter-driver-dds/README.md
  • python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver_test.py

Comment on lines +25 to +30
class DdsTopicQos(BaseModel):
"""Quality of Service settings for a DDS topic."""

reliability: DdsReliability = DdsReliability.RELIABLE
durability: DdsDurability = DdsDurability.VOLATILE
history_depth: int = 10
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Reject non-positive history_depth.

Zero or negative depths are accepted here today. In the mock backend that turns trimming into buggy buf[-history_depth:] slicing; in the real backend the same value is forwarded into DDS QoS. Please enforce history_depth >= 1 before the value reaches either backend.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/common.py`
around lines 25 - 30, DdsTopicQos currently allows zero/negative history_depth;
enforce history_depth >= 1 by adding validation to the DdsTopicQos model: either
change the field declaration to use pydantic Field with ge=1 and default 10
(e.g., history_depth: int = Field(10, ge=1)) or add a pydantic `@validator` for
"history_depth" that raises a ValueError if value < 1, so invalid values are
rejected before reaching the mock or real DDS backends.

Comment on lines +54 to +74
def _make_idl_type(topic_name: str, fields: list[str]):
"""Dynamically create a CycloneDDS IdlStruct type for the given fields.

Each field name maps to a ``str`` type. For complex or mixed-type
schemas, define custom IdlStruct subclasses directly and register
them with the backend.

A hash suffix is appended to the generated class name to prevent
collisions when distinct topic names sanitise to the same identifier
(e.g. ``"sensor/temp"`` and ``"sensor-temp"``).
"""
from cyclonedds.idl import IdlStruct

sanitised = topic_name.replace("/", "_").replace("-", "_").replace(".", "_")
hash_suffix = hashlib.md5(topic_name.encode()).hexdigest()[:8]
cls_name = f"{sanitised}_{hash_suffix}_Type"
if not cls_name.isidentifier():
cls_name = f"Topic_{hash_suffix}_Type"
dc_fields = [(f, str, dc.field(default="")) for f in fields]
idl_cls = dc.make_dataclass(cls_name, dc_fields, bases=(IdlStruct,))
return idl_cls
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Validate topic field names consistently across both backends.

_make_idl_type() depends on Python dataclass field rules, so duplicate names or non-identifiers/keywords will fail on the CycloneDDS path. MockDdsBackend.create_topic() currently accepts those schemas, which lets mock-backed tests pass on topics that production cannot create. Add one shared field validator and call it from both backends before registering the topic.

You can verify the Python-side constraint with:

#!/bin/bash
python - <<'PY'
import dataclasses as dc

cases = [
    ["x", "x"],
    ["bad-name"],
    ["123"],
    ["class"],
]

for fields in cases:
    try:
        dc.make_dataclass("T", [(f, str, dc.field(default="")) for f in fields])
        print(fields, "accepted")
    except Exception as exc:
        print(fields, type(exc).__name__, exc)
PY

Also applies to: 293-309

Comment on lines +167 to +177
idl_type = _make_idl_type(name, fields)
self._idl_types[name] = idl_type

cqos = _build_cyclonedds_qos(qos)
topic = Topic(self._participant, name, idl_type, qos=cqos)
self._topics[name] = topic
self._qos_map[name] = qos
self._sample_counts[name] = 0

self._writers[name] = DataWriter(self._participant, topic, qos=cqos)
self._readers[name] = DataReader(self._participant, topic, qos=cqos)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Make topic registration atomic.

This method writes internal state before DataWriter and DataReader creation is known to succeed. If either constructor fails, the backend is left with a half-registered topic and the next create_topic() call returns "already exists". Only commit the dict updates after all DDS objects are created, or rollback on exception.

Possible fix
         idl_type = _make_idl_type(name, fields)
-        self._idl_types[name] = idl_type
-
         cqos = _build_cyclonedds_qos(qos)
         topic = Topic(self._participant, name, idl_type, qos=cqos)
-        self._topics[name] = topic
-        self._qos_map[name] = qos
-        self._sample_counts[name] = 0
-
-        self._writers[name] = DataWriter(self._participant, topic, qos=cqos)
-        self._readers[name] = DataReader(self._participant, topic, qos=cqos)
+        writer = None
+        try:
+            writer = DataWriter(self._participant, topic, qos=cqos)
+            reader = DataReader(self._participant, topic, qos=cqos)
+        except Exception:
+            if writer is not None:
+                writer.close()
+            topic.close()
+            raise
+
+        self._idl_types[name] = idl_type
+        self._topics[name] = topic
+        self._qos_map[name] = qos
+        self._sample_counts[name] = 0
+        self._writers[name] = writer
+        self._readers[name] = reader
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
idl_type = _make_idl_type(name, fields)
self._idl_types[name] = idl_type
cqos = _build_cyclonedds_qos(qos)
topic = Topic(self._participant, name, idl_type, qos=cqos)
self._topics[name] = topic
self._qos_map[name] = qos
self._sample_counts[name] = 0
self._writers[name] = DataWriter(self._participant, topic, qos=cqos)
self._readers[name] = DataReader(self._participant, topic, qos=cqos)
idl_type = _make_idl_type(name, fields)
cqos = _build_cyclonedds_qos(qos)
topic = Topic(self._participant, name, idl_type, qos=cqos)
writer = None
try:
writer = DataWriter(self._participant, topic, qos=cqos)
reader = DataReader(self._participant, topic, qos=cqos)
except Exception:
if writer is not None:
writer.close()
topic.close()
raise
self._idl_types[name] = idl_type
self._topics[name] = topic
self._qos_map[name] = qos
self._sample_counts[name] = 0
self._writers[name] = writer
self._readers[name] = reader
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/driver.py`
around lines 167 - 177, The code currently assigns to internal maps
(self._idl_types, self._topics, self._qos_map, self._sample_counts) before
constructing DDS objects, so if DataWriter or DataReader construction fails you
end up with a half-registered topic; change create_topic() to first build local
variables (idl_type via _make_idl_type, cqos via _build_cyclonedds_qos, create
Topic, then construct DataWriter and DataReader) and only after all
constructions succeed atomically assign those locals into self._idl_types,
self._topics, self._qos_map, self._sample_counts, self._writers and
self._readers (or alternatively catch exceptions and roll back any partial
assignments) so subsequent create_topic() calls won’t see a partially-registered
topic.

Comment on lines +101 to +120
def test_export_methods_match(self):
def _exported(cls):
return {
name
for name in dir(cls)
if not name.startswith("_")
and callable(getattr(cls, name, None))
and hasattr(getattr(cls, name), "__wrapped__")
}

dds_methods = {
name for name in vars(Dds) if not name.startswith("_") and hasattr(getattr(Dds, name, None), "__wrapped__")
}
mock_methods = {
name
for name in vars(MockDds)
if not name.startswith("_") and hasattr(getattr(MockDds, name, None), "__wrapped__")
}
assert dds_methods == mock_methods, (
f"Export mismatch: Dds has {dds_methods - mock_methods}, MockDds has {mock_methods - dds_methods}"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Use the export markers here; __wrapped__ skips monitor().

@export marks methods with driver-call metadata, not __wrapped__. Since monitor() is exported without @validate_call, this test never compares the streaming method sets and can still pass after a public-surface regression.

Possible fix
         def _exported(cls):
             return {
                 name
-                for name in dir(cls)
-                if not name.startswith("_")
-                and callable(getattr(cls, name, None))
-                and hasattr(getattr(cls, name), "__wrapped__")
+                for name, method in vars(cls).items()
+                if not name.startswith("_")
+                and (
+                    getattr(method, MARKER_DRIVERCALL, None) == MARKER_MAGIC
+                    or getattr(method, MARKER_STREAMING_DRIVERCALL, None) == MARKER_MAGIC
+                )
             }
 
-        dds_methods = {
-            name for name in vars(Dds) if not name.startswith("_") and hasattr(getattr(Dds, name, None), "__wrapped__")
-        }
-        mock_methods = {
-            name
-            for name in vars(MockDds)
-            if not name.startswith("_") and hasattr(getattr(MockDds, name, None), "__wrapped__")
-        }
+        dds_methods = _exported(Dds)
+        mock_methods = _exported(MockDds)

Use the marker constants from jumpstarter.driver.decorators for the helper above.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@python/packages/jumpstarter-driver-dds/jumpstarter_driver_dds/regression_test.py`
around lines 101 - 120, The test_export_methods_match helper is incorrectly
detecting exported methods by checking for __wrapped__ (which misses monitor()
exported without `@validate_call`); update the helper used in
test_export_methods_match (the inner _exported and the dds_methods/mock_methods
sets) to use the official export marker constants from
jumpstarter.driver.decorators (instead of __wrapped__) so it detects methods
annotated with `@export` regardless of `@validate_call`; reference Dds, MockDds,
monitor(), and the export marker constants from jumpstarter.driver.decorators
when changing the membership checks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants