Conversation
Greptile SummaryThis PR introduces DimSim as a third simulation backend for the Unitree Go2 connection. It adds Confidence Score: 3/5Not safe to merge — the odom stream bug will cause the new e2e test to fail and leave the odom output port permanently silent for this backend. P1 defect in the core data path of the new backend; all other additions are solid. dimos/robot/unitree/dimsim_connection.py — _handle_odom must push to odom_stream() Subject. Important Files Changed
Sequence DiagramsequenceDiagram
participant GC as GO2Connection
participant DC as DimSimConnection
participant OT as LCMTransport(/odom)
participant TF as LCMTF
participant DP as DimSimProcess
participant DS as DimSim (Deno)
GC->>DC: start()
DC->>DP: start()
DP->>DS: subprocess (deno run cli.ts dev)
DC->>OT: start() + subscribe(_handle_odom)
DC->>TF: start()
GC->>DC: connection.odom_stream().subscribe(_publish_tf)
Note over GC,DC: Subject returned — nobody pushes to it ⚠️
DS-->>OT: LCM /odom PoseStamped
OT-->>DC: _handle_odom(msg)
DC->>TF: publish(transforms)
Note over DC: odom_stream Subject NOT updated ⚠️
GC->>DC: move(twist)
DC-->>GC: True (stub)
Reviews (2): Last reviewed commit: "feat(sim): add dimsim" | Re-trigger Greptile |
| self._start_log_reader() | ||
| self._start_lcm_listener() | ||
|
|
||
| # Wait for first odom message as readiness signal. | ||
| timeout = 60.0 | ||
| start_time = time.time() | ||
| while time.time() - start_time < timeout: | ||
| if self.process.poll() is not None: | ||
| exit_code = self.process.returncode | ||
| stderr = "" | ||
| if self.process.stderr: | ||
| stderr = self.process.stderr.read().decode(errors="replace") | ||
| self.stop() | ||
| raise RuntimeError(f"DimSim process exited early (code {exit_code})\n{stderr}") |
There was a problem hiding this comment.
stderr already consumed by log reader thread
_start_log_reader() launches a daemon thread at line 204 that iterates over self.process.stderr. Then, when the early-exit branch is reached (line 211), self.process.stderr.read() at line 215 races with that thread — the log reader has likely already drained the pipe, so stderr will almost always be empty, making the error message useless for diagnosing why DimSim failed to start.
Either start the log reader after the readiness-wait loop, or buffer the subprocess output with a deque in the log reader thread so the error handler can read from that buffer instead of the pipe directly.
| @functools.cache | ||
| def lidar_stream(self) -> Observable[PointCloud2]: | ||
| def getter() -> PointCloud2 | None: | ||
| if self._lidar_seq > self._last_lidar_seq: | ||
| self._last_lidar_seq = self._lidar_seq | ||
| return self._latest_lidar | ||
| return None | ||
|
|
||
| return self._create_stream(getter, _LIDAR_FPS, "Lidar") | ||
|
|
||
| @functools.cache | ||
| def odom_stream(self) -> Observable[PoseStamped]: | ||
| def getter() -> PoseStamped | None: | ||
| if self._odom_seq > self._last_odom_seq: | ||
| self._last_odom_seq = self._odom_seq | ||
| return self._latest_odom | ||
| return None | ||
|
|
||
| return self._create_stream(getter, _ODOM_FPS, "Odom") | ||
|
|
||
| @functools.cache | ||
| def video_stream(self) -> Observable[Image]: | ||
| def getter() -> Image | None: | ||
| if self._image_seq > self._last_image_seq: | ||
| self._last_image_seq = self._image_seq | ||
| return self._latest_image | ||
| return None | ||
|
|
||
| return self._create_stream(getter, _VIDEO_FPS, "Video") |
There was a problem hiding this comment.
@functools.cache on instance methods — class-level cache causes leaks and cross-instance invalidation
functools.cache (an alias for lru_cache(maxsize=None)) stores its cache on the function object, not on self. The result is:
- Memory leak — the cache keeps a strong reference to every
selfthat was ever passed as a key, preventing garbage collection. cache_clear()instop()is global — callingself.lidar_stream.cache_clear()clears the entries for allDimSimConnectioninstances, not just the one being stopped.
The standard fix is to store the Observable in an instance dict on first call and reset it in stop() instead of relying on cache_clear().
| if self.process: | ||
| if self.process.stderr: | ||
| self.process.stderr.close() |
There was a problem hiding this comment.
stdout pipe is never closed in stop()
stop() closes self.process.stderr but leaves self.process.stdout open, leaking the file descriptor. Consider closing both:
| if self.process: | |
| if self.process.stderr: | |
| self.process.stderr.close() | |
| if self.process.stderr: | |
| self.process.stderr.close() | |
| if self.process.stdout: | |
| self.process.stdout.close() |
| def getter() -> PointCloud2 | None: | ||
| if self._lidar_seq > self._last_lidar_seq: | ||
| self._last_lidar_seq = self._lidar_seq | ||
| return self._latest_lidar | ||
| return None | ||
|
|
||
| return self._create_stream(getter, _LIDAR_FPS, "Lidar") |
There was a problem hiding this comment.
Unsynchronised access to shared state in stream getters
The getter closures read and write self._lidar_seq, self._last_lidar_seq, and self._latest_lidar from polling threads while the LCM callback thread writes them concurrently. There is a TOCTOU window where _latest_lidar can be None even after the sequence check passes. A threading.Lock protecting reads and writes to these pairs would close the window.
9d01ca0 to
d26e60d
Compare
d26e60d to
6e02118
Compare
Co-authored-by: Viswajit Nair <viswajitnair@gmail.com>
6e02118 to
1aec40d
Compare
| def start(self) -> None: | ||
| self._dimsim_process.start() | ||
| self._odom_transport.start() | ||
| self._unsubscribe_odom = self._odom_transport.subscribe(self._handle_odom) | ||
| self._tf.start() | ||
|
|
||
| def stop(self) -> None: | ||
| self._tf.stop() | ||
| if self._unsubscribe_odom is not None: | ||
| self._unsubscribe_odom() | ||
| self._odom_transport.stop() | ||
| self._dimsim_process.stop() | ||
|
|
There was a problem hiding this comment.
Odom data received via LCM is never forwarded to
odom_stream()
_handle_odom receives PoseStamped messages from the /odom LCM transport but only uses them to publish TF. It never pushes to the odom_stream() Subject. As a result, every subscriber to odom_stream() (including GO2Connection.start(), which chains it to _publish_tf → odom.publish()) will never receive a message. The odom output port on GO2Connection stays permanently silent for this backend, breaking any consumer that relies on odometry — including the e2e test that calls wait_until_odom_position.
No description provided.