Conversation
Prepare the codebase for Zenoh integration without changing behavior. All existing tests pass (1401 passed, 3 xfailed for Phase 2 stubs). - Add `transport` field to GlobalConfig (default: "lcm") - Add ZENOH_AVAILABLE guard in transport.py - Branch _get_transport_for() on global_config.transport - Gate LCM configurators to only run when transport is "lcm" - Add ZenohTransport/pZenohTransport behind ZENOH_AVAILABLE guard - Add zenohpubsub.py stub (raises NotImplementedError) - Add `zenoh` optional dependency group in pyproject.toml - Add test_zenoh_transport.py covering all new conditional branches Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
TDD: tests written first, then implementation. Follows DDSService pattern — module-level session dict with lock. - ZenohConfig with mode/connect/listen fields and session_key - ZenohService.start() opens session if not exists for config - ZenohService.stop() does NOT close shared session - session property raises RuntimeError if not started - Two services with same config share one session (8 tests pass) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
TDD: tests written first, then implementation. - ZenohPubSubBase(ZenohService, AllPubSub[Topic, bytes]) - Publisher caching per key expression (avoids re-declaring) - Subscriber tracking for cleanup on stop() - Idempotent unsubscribe (guards against Zenoh ZError) - subscribe_all() via dimos/** wildcard - Zenoh and PickleZenoh composed classes (encoder mixins) - 7 unit tests pass Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Both encoder-composed variants pass all spec conformance tests: - test_store, test_multiple_subscribers, test_unsubscribe - test_multiple_messages, test_async_iterator - 25 total tests pass (10 new Zenoh tests) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Remove xfail markers — Phase 2 stubs are now real implementations. Add transport wrapper integration tests for broadcast/subscribe. - ZenohTransport wraps Zenoh (LCM-encoded) with DDSTransport pattern - pZenohTransport wraps PickleZenoh with Topic wrapping for pubsub layer - Auto-start on first broadcast, stop/restart lifecycle - 16 tests pass (4 new wrapper tests) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Zenoh appears alongside LCM, SHM in benchmark heatmaps. Results: competitive with LCM for localhost — 82-149k msgs/sec for small messages, 0% message loss, <1ms latency. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Transport-level errors (session closed, invalid key expression) are logged but not raised. Delivery guarantees are handled by Zenoh's reliability protocol, not by exception propagation. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Fix dimensionalOS#3: unsubscribe() now only calls undeclare() if it successfully removed the subscriber from the list. If stop() already cleared the list, unsubscribe() returns without double-undeclaring. - Fix dimensionalOS#5: on_sample callback wraps payload.to_bytes() in try/except to prevent malformed payloads from crashing Zenoh's internal thread. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Check membership before removing instead of catching ValueError. Reads more clearly and avoids using exceptions for control flow. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Two issues prevented the Rerun bridge from showing data over Zenoh: 1. The bridge hardcoded LCM() as its pubsub. Now resolves lazily at start() using self.config.g.transport from the worker's GlobalConfig. 2. Zenoh key expressions cannot contain '#' (forbidden character). Type info is now embedded as a '/' segment in the key expression (e.g., dimos/pointcloud/sensor_msgs.PointCloud2). _key_expr_to_topic reconstructs the Topic with lcm_type for subscribe_all decoding. Also fixes entity path mapping to strip the dimos/ prefix so Zenoh entity paths match LCM paths in the Rerun viewer. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- typed_out/untyped_out → typed_data/untyped_data - Use TypedMsg instead of Image for blueprint integration tests - Image still used in transport wrapper test (real LCM round-trip) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replace raw time.sleep() calls with named helpers that document intent. wait_for_subscribers() explains Zenoh has no "subscriber ready" signal. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replace manual if-both-received check with threading.Barrier(2). The previous approach could miss the event if both callbacks ran concurrently and checked the other's list before it was populated. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Review findings dimensionalOS#2 and dimensionalOS#4: - Remove Config.pubsubs from RerunBridgeModule — pubsubs are resolved lazily at start() from global_config.transport - Remove _zenoh_topic field from pZenohTransport — construct on demand like pLCMTransport does, avoiding dual state Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
8 new tests covering: - Typed/untyped topic → key expression conversion - Key expression → topic with known/unknown/missing type - Default lcm_type fallback - Round-trip typed and untyped Also documents known limitation: if a topic's base path ends with a segment matching a registered DimosMsg type name, _key_expr_to_topic will incorrectly split it. In practice this doesn't happen because stream names (cmd_vel, lidar) don't match type names. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Existing blueprints pass pubsubs=[LCM()] to RerunBridgeModule. Removing the field caused a Pydantic ValidationError (extra_forbidden). Keep the field but document that it's ignored — start() resolves the pubsub backend from global_config.transport instead. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
dev refactored blueprints.py → coordination/blueprints.py and coordination/module_coordinator.py. Applied our Zenoh changes to the new locations: - _get_transport_for() and _run_configurators() moved to module_coordinator.py - Service no longer generic — use config: ZenohConfig annotation - Updated test imports and mock paths for new module structure - Resolved transport.py import conflict (dev removed JpegLCM) - Resolved bridge.py conflict (dev added colormap registration) All 1559 tests pass. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
TF (transform frames) is hardcoded to LCM in the Module base class. When transport=zenoh, module streams use Zenoh but TF stays on LCM. The bridge now listens on both so the robot pose updates in the viewer. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
running it crashes on import error (not below RuntimeError, I found this in code) raise RuntimeError(
"transport='zenoh' but eclipse-zenoh is not installed. "
"Install with: uv sync --extra zenoh"
)but executing |
|
@leshy I am not very familiar with |
This comment was marked as outdated.
This comment was marked as outdated.
Zenoh tests used time.sleep() to wait for subscriber propagation, which is either too slow or too flaky in CI. Replace with _retry_until() that re-publishes in a tight loop until the subscriber's Event fires. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Calls zenoh.init_log_from_env_or("warn") at module load so that
RUST_LOG=debug surfaces Zenoh's Rust-side transport logs (including
SHM negotiation). Defaults to warn to avoid noise.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
uv sync --extra zenoh would resolve dimos[dev] from PyPI instead of the local project, uninstalling other dependencies. The zenoh extra only needs eclipse-zenoh — base deps are already installed. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
let's learn how to use UV, it's our main package manager, this is not the last feature of this type etc @paul-nechifor will know |
Greptile SummaryThis PR integrates eclipse-zenoh as an optional pub/sub transport alongside the existing LCM stack, adding
Confidence Score: 4/5Safe to merge with one P1 fix: the silent LCM fallback in _default_pubsubs should log a warning when Zenoh is configured but unavailable. One P1 finding: _default_pubsubs silently falls back to LCM when Zenoh is configured but not installed, causing invisible message loss in the Rerun bridge without any diagnostic message. The rest of the implementation is well-structured, follows existing patterns, has good lock discipline, and is covered by comprehensive tests. dimos/visualization/rerun/bridge.py — silent Zenoh unavailability fallback needs a warning log Important Files Changed
Sequence DiagramsequenceDiagram
participant App
participant ModuleCoordinator
participant _get_transport_for
participant ZenohTransport
participant Zenoh (LCMEncoderMixin)
participant ZenohPubSubBase
participant ZenohService
participant zenoh.Session
App->>ModuleCoordinator: deploy_blueprint()
ModuleCoordinator->>_get_transport_for: (name, stream_type)
_get_transport_for-->>ZenohTransport: ZenohTransport(topic, type)
ZenohTransport->>ZenohService: start()
ZenohService->>zenoh.Session: open(config) [once per session_key]
ZenohTransport->>Zenoh (LCMEncoderMixin): publish(LCMTopic, msg)
Zenoh (LCMEncoderMixin)->>ZenohPubSubBase: publish(topic, bytes)
ZenohPubSubBase->>zenoh.Session: publisher.put(bytes)
zenoh.Session-->>ZenohPubSubBase: callback(sample)
ZenohPubSubBase-->>Zenoh (LCMEncoderMixin): decode bytes → msg
Zenoh (LCMEncoderMixin)-->>App: subscriber callback(msg, topic)
Reviews (1): Last reviewed commit: "Merge branch 'dev' into feat/integrate-z..." | Re-trigger Greptile |
| if transport == "zenoh": | ||
| from dimos.core.transport import ZENOH_AVAILABLE | ||
|
|
||
| if ZENOH_AVAILABLE: | ||
| from dimos.protocol.pubsub.impl.zenohpubsub import Zenoh | ||
|
|
||
| return [Zenoh(), LCM()] | ||
| return [LCM()] | ||
|
|
There was a problem hiding this comment.
Silent Zenoh unavailability fallback
When transport == "zenoh" but eclipse-zenoh is not installed, _default_pubsubs silently returns [LCM()] with no log warning. The bridge will subscribe to LCM only, so Zenoh-published messages are invisible in the viewer — and there is no indication of why.
_get_transport_for in module_coordinator.py raises RuntimeError("transport='zenoh' but eclipse-zenoh is not installed") for the same condition. _default_pubsubs should at minimum emit a warning so operators can diagnose the mismatch.
| if transport == "zenoh": | |
| from dimos.core.transport import ZENOH_AVAILABLE | |
| if ZENOH_AVAILABLE: | |
| from dimos.protocol.pubsub.impl.zenohpubsub import Zenoh | |
| return [Zenoh(), LCM()] | |
| return [LCM()] | |
| if ZENOH_AVAILABLE: | |
| from dimos.protocol.pubsub.impl.zenohpubsub import Zenoh | |
| return [Zenoh(), LCM()] | |
| logger.warning( | |
| "transport='zenoh' but eclipse-zenoh is not installed; " | |
| "RerunBridge falling back to LCM only. " | |
| "Install with: uv sync --extra zenoh" | |
| ) |
| from dimos.utils.logging_config import setup_logger | ||
|
|
There was a problem hiding this comment.
Side-effecting call placed between import statements
zenoh.init_log_from_env_or("warn") is interleaved between two import statements. While the intent (initialize Rust tracing before any zenoh operation) is sound, PEP 8 requires all imports to come first. Consider moving this call to the end of the import block or wrapping it in a function called at session creation time.
|
|
||
| @property | ||
| def session(self) -> zenoh.Session: | ||
| """Get the Zenoh Session instance for this service's config.""" | ||
| key = self.config.session_key | ||
| if key not in _sessions: | ||
| raise RuntimeError("Zenoh session not initialized — call start() first") | ||
| return _sessions[key] |
There was a problem hiding this comment.
session property reads shared dict without holding _sessions_lock
start() writes to _sessions under _sessions_lock, but session reads from it without the lock. The check-then-use pattern is logically racy and inconsistent with the lock discipline established in start(). Consider acquiring _sessions_lock in the session property before the if key not in _sessions check.
There was a problem hiding this comment.
Pull request overview
This PR introduces an optional Zenoh-based transport path alongside the existing LCM transport, wiring it through core transport selection, adding a Zenoh pubsub implementation + session service, and updating the Rerun viewer bridge and tests to exercise Zenoh integration.
Changes:
- Add a
zenohoptional dependency extra (eclipse-zenoh) and lockfile entries. - Implement Zenoh session management (
ZenohService) and Zenoh pubsub (ZenohPubSubBase,Zenoh,PickleZenoh), plus core transport wrappers and coordinator branching. - Extend existing test/benchmark harnesses and the Rerun bridge to operate with Zenoh (when available).
Reviewed changes
Copilot reviewed 12 out of 13 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
uv.lock |
Adds eclipse-zenoh package and a new zenoh extra in the resolved lock metadata. |
pyproject.toml |
Defines a new zenoh extra (eclipse-zenoh>=1.0.0,<2.0). |
dimos/core/global_config.py |
Adds transport setting (default lcm) to control transport selection. |
dimos/core/transport.py |
Adds ZENOH_AVAILABLE detection and Zenoh transport wrappers when available. |
dimos/core/coordination/module_coordinator.py |
Switches stream transport selection based on global_config.transport; gates LCM configurators. |
dimos/visualization/rerun/bridge.py |
Resolves pubsub backend lazily and normalizes entity paths for Zenoh topics. |
dimos/protocol/service/zenohservice.py |
Introduces singleton Zenoh session management service. |
dimos/protocol/service/test_zenohservice.py |
Adds unit tests for Zenoh session singleton behavior. |
dimos/protocol/pubsub/impl/zenohpubsub.py |
Implements Zenoh pubsub backend + key-expression/topic conversion helpers. |
dimos/protocol/pubsub/impl/test_zenohpubsub.py |
Adds integration tests for Zenoh pubsub behavior and conversions. |
dimos/protocol/pubsub/test_spec.py |
Extends the generic pubsub spec tests to include Zenoh variants when available. |
dimos/protocol/pubsub/benchmark/testdata.py |
Adds Zenoh to the pubsub benchmark testcase list when available. |
dimos/core/test_zenoh_transport.py |
Adds tests for coordinator branching and Zenoh transport wrapper lifecycle. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| mcp_host: str = "127.0.0.1" | ||
| transport: str = "lcm" | ||
| dtop: bool = False |
There was a problem hiding this comment.
GlobalConfig.transport is a free-form str, but the code branches on exact string values (e.g. "lcm" vs "zenoh"). To avoid silent fallback on typos and to get early validation from Pydantic settings, consider typing this as a Literal[...] (or an enum) consistent with viewer: ViewerBackend.
| if ZENOH_AVAILABLE: | ||
| from dimos.protocol.pubsub.impl.zenohpubsub import Zenoh | ||
|
|
||
| return [Zenoh(), LCM()] |
There was a problem hiding this comment.
When transport == "zenoh" but ZENOH_AVAILABLE is false, this silently falls back to [LCM()]. That can hide a misconfiguration (the rest of the system raises for transport='zenoh' without the extra). Consider raising a clear error or at least logging a warning so the viewer doesn't quietly listen on the wrong transport.
| return [Zenoh(), LCM()] | |
| return [Zenoh(), LCM()] | |
| raise RuntimeError( | |
| "transport='zenoh' was requested for the rerun bridge, but Zenoh support " | |
| "is not available. Install the Zenoh extra/dependencies or choose a " | |
| "different transport." | |
| ) |
| _sessions: dict[str, zenoh.Session] = {} | ||
| _sessions_lock = threading.Lock() | ||
|
|
||
|
|
||
| class ZenohConfig(BaseConfig): | ||
| """Configuration for Zenoh service.""" | ||
|
|
||
| mode: str = "peer" | ||
| connect: list[str] = [] | ||
| listen: list[str] = [] | ||
|
|
||
| @property | ||
| def session_key(self) -> str: | ||
| """Produce a hashable key for singleton session lookup.""" | ||
| return f"{self.mode}|{json.dumps(sorted(self.connect))}|{json.dumps(sorted(self.listen))}" | ||
|
|
||
|
|
||
| class ZenohService(Service): | ||
| config: ZenohConfig | ||
|
|
||
| def start(self) -> None: | ||
| """Start the Zenoh service — opens a session if one doesn't exist for this config.""" | ||
| key = self.config.session_key | ||
| with _sessions_lock: | ||
| if key not in _sessions: | ||
| config = zenoh.Config() | ||
| config.insert_json5("mode", json.dumps(self.config.mode)) | ||
| if self.config.connect: | ||
| config.insert_json5("connect/endpoints", json.dumps(self.config.connect)) | ||
| if self.config.listen: | ||
| config.insert_json5("listen/endpoints", json.dumps(self.config.listen)) | ||
| _sessions[key] = zenoh.open(config) | ||
| logger.debug(f"Zenoh session opened in {self.config.mode} mode") | ||
| super().start() | ||
|
|
||
| def stop(self) -> None: | ||
| """Stop the Zenoh service — does NOT close the shared session.""" | ||
| super().stop() |
There was a problem hiding this comment.
ZenohService caches sessions globally in _sessions but never closes/evicts them (and stop() explicitly does not close the session). If code creates services with multiple distinct configs over the process lifetime, sessions (and Zenoh background threads/resources) will accumulate without a cleanup path. Consider adding reference counting, an explicit close()/shutdown() API, or closing/removing the session when no live service instances are using a given session_key.
| from __future__ import annotations | ||
|
|
||
| import pytest | ||
|
|
There was a problem hiding this comment.
These tests import dimos.protocol.service.zenohservice at module import time, which imports zenoh unconditionally. If the zenoh extra isn't installed (the default in most CI/dev envs), pytest collection will crash before any skipif can apply. Guard the module with pytest.importorskip("zenoh") / pytest.skip(..., allow_module_level=True) or move the import behind a ZENOH_AVAILABLE check so the file is safely skipped when Zenoh isn't installed.
| pytest.importorskip("zenoh") |
| from collections.abc import Callable | ||
|
|
||
| import pytest | ||
|
|
There was a problem hiding this comment.
This test module imports ZenohPubSubBase/zenohservice at import time; because zenohservice.py imports zenoh unconditionally, pytest collection will fail when the zenoh extra isn't installed. Add a module-level pytest.importorskip("zenoh") (or skip based on ZENOH_AVAILABLE) before importing Zenoh-dependent modules.
| pytest.importorskip("zenoh") |
| from dimos.core.stream import In, Out | ||
| from dimos.core.transport import ZENOH_AVAILABLE, LCMTransport, pLCMTransport | ||
| from dimos.msgs.sensor_msgs.Image import Image | ||
| from dimos.protocol.pubsub.impl.test_zenohpubsub import _retry_until | ||
|
|
There was a problem hiding this comment.
_retry_until is imported from dimos.protocol.pubsub.impl.test_zenohpubsub, which itself imports Zenoh modules unconditionally. This makes the entire test file fail to import when Zenoh isn't installed, even though most tests here are not Zenoh-dependent. Consider duplicating _retry_until locally (or moving it to a non-Zenoh test utility module) and guarding Zenoh-only imports/tests with pytest.importorskip("zenoh") / skipif(not ZENOH_AVAILABLE) at module level.
| from dimos.protocol.service.system_configurator.lcm_config import lcm_configurators | ||
|
|
||
| configurators = [*lcm_configurators(), *blueprint.configurator_checks] | ||
| lcm_checks = lcm_configurators() if global_config.transport == "lcm" else [] |
There was a problem hiding this comment.
_run_configurators() now skips all LCM system configurators when global_config.transport != "lcm". However ModuleConfig still uses LCM for RPC (LCMRPC) and TF (LCMTF) regardless of the stream transport, so skipping multicast/buffer configuration can break core functionality even in transport="zenoh" mode. Consider always running LCM configurators (or at least the subset required for RPC/TF), and gating only the parts that are strictly tied to LCM pubsub streams.
| lcm_checks = lcm_configurators() if global_config.transport == "lcm" else [] | |
| # LCM is still used by core facilities such as RPC/TF even when the | |
| # selected stream transport is not "lcm", so LCM configurators must | |
| # always run here. Only configurators that are strictly specific to | |
| # optional LCM pubsub streams should be gated elsewhere. | |
| lcm_checks = lcm_configurators() |

Problem
Closes DIM-XXX
Solution
Breaking Changes
How to Test
Contributor License Agreement