feat: add CLI shell and exec commands for pod terminal access#124
feat: add CLI shell and exec commands for pod terminal access#124
Conversation
Add two new commands under `centml cluster`:
- `shell <id>` -- interactive terminal session (like docker exec -it)
- `exec <id> -- <command>` -- run a command and return output (like ssh host "cmd")
Both connect via WebSocket through the Platform API terminal proxy,
matching the same protocol used by the Web UI TerminalView.
06e9ae1 to
155fdbf
Compare
Replaces str.replace("https://", "wss://") with urllib.parse.urlparse
scheme replacement to avoid CodeQL py/incomplete-url-substring-sanitization.
Replace url.startswith() assertions with urllib.parse.urlparse() checks to satisfy CodeQL py/incomplete-url-substring-sanitization rule. Reformat both shell.py and test_shell.py with black.
0073b53 to
095bd1e
Compare
Add pyte as local terminal emulator (equivalent to xterm.js) to solve cursor positioning and line wrapping issues. Feed WebSocket output through pyte Screen/Stream and render only dirty lines with ANSI escape sequences.
shutil.get_terminal_size() returns (columns, lines), not (rows, cols). The swapped unpacking caused pyte Screen to be created with terminal line count as width, making the display extremely narrow.
…andling Replace regex-based _strip_ansi with pyte single-row screen for marker detection. pyte interprets all VT100/VT220 sequences including OSC and truecolor escapes that the regex could miss.
If two Code signals arrive within 3 seconds, the shell has exited and the reconnect just opened a new session. Exit cleanly instead of looping forever.
Logs to /tmp/centml_shell_debug.log (overridable via CENTML_SHELL_DEBUG_LOG env var). Traces every WS message, stdin event, task lifecycle, reconnect decision, and connection close.
The platform API proxy never forwards ArgoCD Code messages and does not close the WebSocket when the remote shell exits. Replace the Code/reconnect logic with exit echo detection: when the server echoes back "exit\r\n", arm a 2-second idle timeout on ws.recv(). If no more data arrives, the shell has exited -- break out cleanly. Also removes Code handling from _exec_session (markers already work).
…ompt When "exit\r\n" appears at the end of ws data (nothing after it), the shell has exited -- return immediately instead of waiting 2s. When "exit\r\n" is followed by a new prompt (e.g. from echo exit), ignore it and continue the session.
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
| @click.option("--pod", default=None, help="Specify a pod name") | ||
| @click.option("--shell", "shell_type", default=None, type=click.Choice(["bash", "sh", "zsh"]), help="Shell type") | ||
| @click.option( | ||
| "--first-pod", is_flag=True, default=False, help="Auto-select the first running pod (skip interactive selection)" |
There was a problem hiding this comment.
I don't think we need this right? if --pod is not provided, then we default to first pod
| @click.option("--pod", default=None, help="Specific pod name") | ||
| @click.option("--shell", "shell_type", default=None, type=click.Choice(["bash", "sh", "zsh"]), help="Shell type") | ||
| @click.option( | ||
| "--first-pod", is_flag=True, default=False, help="Auto-select the first running pod (skip interactive selection)" |
| ) | ||
|
|
||
|
|
||
| def _resolve_pod(running_pods: list[str], pod_name: str) -> str: |
There was a problem hiding this comment.
I believe this check is also handled at the API. we should be able to remove it from the client
|
@coderabbitai full review |
✅ Actions performedFull review triggered. |
WalkthroughAdds WebSocket-based interactive and non-interactive shell commands to the CLI, SDK session utilities and exceptions, a v3 status API method, new runtime deps, and comprehensive tests for pod resolution, WS sessions, and terminal rendering. Changes
Sequence Diagram(s)sequenceDiagram
actor User
participant CLI as "CLI Shell"
participant API as "CentML API"
participant WS as "WebSocket / Pod"
User->>CLI: Invoke shell/exec command
CLI->>API: Query deployment status / running pods
API-->>CLI: Pod list
alt Single pod or --first-pod
CLI->>CLI: Auto-select pod
else Multiple pods
CLI->>User: Prompt for pod
User-->>CLI: Select pod
end
CLI->>API: Request auth token
API-->>CLI: Token
CLI->>CLI: Build WS URL
CLI->>WS: Connect with token
WS-->>CLI: Connection established
alt Interactive
CLI->>WS: Resize PTY
rect rgba(100, 150, 200, 0.5)
loop Bidirectional I/O
User->>CLI: stdin
CLI->>WS: send stdin frames
WS->>CLI: stdout/stderr frames
CLI->>User: display output
end
end
else Exec (non-interactive)
CLI->>WS: Resize + send wrapped command (with markers)
rect rgba(100, 150, 200, 0.5)
WS->>CLI: Output including BEGIN/END markers
CLI->>CLI: Parse output, extract exit code
end
CLI->>User: show output and exit code
end
CLI->>WS: Close connection
WS-->>CLI: Closed
CLI->>User: Return exit code
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
⚔️ Resolve merge conflicts
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Nitpick comments (1)
tests/test_sdk_shell_session.py (1)
315-323: Consider explicitly managing the background shutdown task.The fire-and-forget task pattern here works, but it's implicit. While Python 3.10+ guarantees the task won't be collected, explicitly retaining a reference and cleaning it up in a
finallyblock (or using a context manager) would make the intent clearer and reduce the risk of subtle issues if the code evolves.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_sdk_shell_session.py` around lines 315 - 323, The test starts a fire-and-forget task via asyncio.create_task(_set_shutdown()) without keeping a reference; change the _run coroutine to store the task (e.g., shutdown_task = asyncio.create_task(_set_shutdown())), then wrap the call to forward_io(ws, [80, 24], shutdown) in a try/finally where in finally you cancel the shutdown_task (if still running) and await it to ensure cleanup; reference symbols to edit: _run, _set_shutdown, shutdown, forward_io, and the asyncio.create_task call.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@centml/cli/shell.py`:
- Around line 41-56: The _connect_args function can block by calling _select_pod
(which uses click.prompt) when multiple pods exist; modify _connect_args to
avoid prompting when stdin is not a TTY: import sys and before calling
_select_pod check sys.stdin.isatty(), and if it's False and neither pod nor
first_pod is provided raise click.ClickException instructing the caller to pass
--pod or --first-pod; keep the existing behavior (call _select_pod) only when
sys.stdin.isatty() is True. Ensure this logic is applied inside the
_connect_args function surrounding the branch that currently sets pod_name =
_select_pod(running_pods, deployment_id).
- Around line 89-91: The exec_cmd function currently concatenates the
Click-provided command tuple with " ".join(command), which loses
quoting/escaping; replace that with shlex.join(command) and ensure shlex is
imported so exec_cmd calls asyncio.run(exec_session(ws_url, token,
shlex.join(command))) to preserve argument boundaries for POSIX shells (refer to
exec_cmd and exec_session symbols).
In `@centml/sdk/shell/session.py`:
- Around line 226-229: The code currently initializes exit_code = 0 and silently
falls through on ConnectionClosed, causing a false "success" when END_MARKER
wasn't received; update the ConnectionClosed handling in
centml/sdk/shell/session.py to treat a socket close before detecting END_MARKER
as a transport failure by either setting exit_code to a non-zero value (e.g., 1)
or raising an exception (e.g., RuntimeError("socket closed before END_MARKER")),
and ensure this logic checks the buffer/is_done/END_MARKER state so you only
mark failure when the trailer was not parsed; apply the same change to the
second occurrence referenced (around lines 258-260) so both code paths return
non-zero or raise on missing END_MARKER.
- Around line 11-12: The code calls websockets.connect(...) with
additional_headers (in centml/sdk/shell/session.py at the two call sites that
currently use websockets.connect around lines 162 and 208) which is only
supported by the new asyncio client; change the import to use the asyncio client
explicitly by importing connect from websockets.asyncio.client and replace
websockets.connect(...) calls with connect(...), or alternatively raise the
minimum requirement to websockets>=14.0 in requirements so the existing import
"import websockets" remains valid—pick one approach and apply it consistently to
both call sites.
In `@tests/test_cli_shell.py`:
- Around line 43-68: The test helper _patch_deps currently only patches
asyncio.run, causing real coroutine objects from interactive_session and
exec_session to be created (leading to un-awaited coroutine warnings and
preventing assertions); update _patch_deps to also patch
centml.cli.shell.interactive_session and centml.cli.shell.exec_session (or have
the patched asyncio.run explicitly await/close a fake coroutine) so tests can
inspect calls and avoid warnings; ensure the patched names (interactive_session,
exec_session) return a simple mock/coroutine that records the URL, shell type,
and command used by shell() and exec_cmd() for assertions.
---
Nitpick comments:
In `@tests/test_sdk_shell_session.py`:
- Around line 315-323: The test starts a fire-and-forget task via
asyncio.create_task(_set_shutdown()) without keeping a reference; change the
_run coroutine to store the task (e.g., shutdown_task =
asyncio.create_task(_set_shutdown())), then wrap the call to forward_io(ws, [80,
24], shutdown) in a try/finally where in finally you cancel the shutdown_task
(if still running) and await it to ensure cleanup; reference symbols to edit:
_run, _set_shutdown, shutdown, forward_io, and the asyncio.create_task call.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 1295c58d-f50f-4796-812b-7dac84fb274d
📒 Files selected for processing (11)
centml/cli/main.pycentml/cli/shell.pycentml/sdk/api.pycentml/sdk/shell/__init__.pycentml/sdk/shell/exceptions.pycentml/sdk/shell/session.pyrequirements.txttests/conftest.pytests/test_cli_shell.pytests/test_sdk_shell_renderer.pytests/test_sdk_shell_session.py
| def _connect_args(deployment_id, pod, shell_type, first_pod=False): | ||
| """Resolve pod, build WebSocket URL, and obtain auth token.""" | ||
| with get_centml_client() as cclient: | ||
| running_pods = get_running_pods(cclient, deployment_id) | ||
| if not running_pods: | ||
| raise click.ClickException(f"No running pods found for deployment {deployment_id}") | ||
|
|
||
| if pod is not None: | ||
| try: | ||
| pod_name = _resolve_pod(running_pods, pod) | ||
| except ShellError as exc: | ||
| raise click.ClickException(str(exc)) from exc | ||
| elif len(running_pods) == 1 or first_pod: | ||
| pod_name = running_pods[0] | ||
| else: | ||
| pod_name = _select_pod(running_pods, deployment_id) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# First, let's understand the structure of the file and locate the key functions
cd /repo
find . -name "shell.py" -path "*/cli/*" 2>/dev/null | head -5Repository: CentML/centml-python-client
Length of output: 165
🏁 Script executed:
#!/bin/bash
# Read the entire shell.py file to understand the context
wc -l centml/cli/shell.pyRepository: CentML/centml-python-client
Length of output: 93
🏁 Script executed:
#!/bin/bash
# Get the content of lines 1-100 to see the function definitions and calls
cat -n centml/cli/shell.py | head -100Repository: CentML/centml-python-client
Length of output: 4356
Avoid prompting from exec_cmd when stdin isn't interactive.
The exec_cmd() function calls _connect_args() without checking TTY, but _connect_args() can trigger _select_pod() which calls click.prompt() and will block when multiple pods exist and --pod is not specified. This causes unexpected hangs or failures in non-interactive contexts like CI and scripts. Only shell() has the TTY check (line 72–73). Gate the prompting logic in _connect_args() on sys.stdin.isatty() and require --pod or --first-pod for non-interactive callers.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@centml/cli/shell.py` around lines 41 - 56, The _connect_args function can
block by calling _select_pod (which uses click.prompt) when multiple pods exist;
modify _connect_args to avoid prompting when stdin is not a TTY: import sys and
before calling _select_pod check sys.stdin.isatty(), and if it's False and
neither pod nor first_pod is provided raise click.ClickException instructing the
caller to pass --pod or --first-pod; keep the existing behavior (call
_select_pod) only when sys.stdin.isatty() is True. Ensure this logic is applied
inside the _connect_args function surrounding the branch that currently sets
pod_name = _select_pod(running_pods, deployment_id).
| def exec_cmd(deployment_id, command, pod, shell_type, first_pod): | ||
| ws_url, token = _connect_args(deployment_id, pod, shell_type, first_pod) | ||
| exit_code = asyncio.run(exec_session(ws_url, token, " ".join(command))) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's look at the file and the specific lines mentioned
cat -n centml/cli/shell.py | sed -n '80,100p'Repository: CentML/centml-python-client
Length of output: 965
🏁 Script executed:
# Also check the imports at the top of the file
head -20 centml/cli/shell.pyRepository: CentML/centml-python-client
Length of output: 510
🏁 Script executed:
# Let's understand the broader context of the exec_cmd function
cat -n centml/cli/shell.py | sed -n '85,110p'Repository: CentML/centml-python-client
Length of output: 522
🏁 Script executed:
# Check if shlex is already imported anywhere in the file
grep -n "import shlex" centml/cli/shell.pyRepository: CentML/centml-python-client
Length of output: 53
🏁 Script executed:
# Search for how command is passed to exec_cmd - look for Click decorators
cat -n centml/cli/shell.py | sed -n '1,100p'Repository: CentML/centml-python-client
Length of output: 4356
🏁 Script executed:
# Check Python version requirements
fd -t f -e "toml" -e "cfg" -e "txt" | xargs grep -l "python" 2>/dev/null | head -5Repository: CentML/centml-python-client
Length of output: 87
🏁 Script executed:
# Look for pyproject.toml or setup.py
ls -la centml/ | head -20
git ls-files | grep -E "(pyproject|setup)" | head -10Repository: CentML/centml-python-client
Length of output: 409
🏁 Script executed:
# Check the project config
cat pyproject.toml 2>/dev/null || cat setup.py 2>/dev/nullRepository: CentML/centml-python-client
Length of output: 747
Use shlex.join() to preserve command argument boundaries.
The command argument is tokenized by Click (nargs=-1, type=click.UNPROCESSED). Using " ".join(command) loses quoting and escaping, so inputs like "hello world", $HOME, or * are misinterpreted in the pod shell. Use shlex.join(command) for the POSIX shells (bash, sh, zsh) specified here.
Suggested fix
+import shlex
...
- exit_code = asyncio.run(exec_session(ws_url, token, " ".join(command)))
+ exit_code = asyncio.run(exec_session(ws_url, token, shlex.join(command)))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@centml/cli/shell.py` around lines 89 - 91, The exec_cmd function currently
concatenates the Click-provided command tuple with " ".join(command), which
loses quoting/escaping; replace that with shlex.join(command) and ensure shlex
is imported so exec_cmd calls asyncio.run(exec_session(ws_url, token,
shlex.join(command))) to preserve argument boundaries for POSIX shells (refer to
exec_cmd and exec_session symbols).
| import websockets | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat requirements.txt | grep -i websocketsRepository: CentML/centml-python-client
Length of output: 87
🏁 Script executed:
head -20 centml/sdk/shell/session.py && sed -n '155,170p' centml/sdk/shell/session.py && sed -n '200,215p' centml/sdk/shell/session.pyRepository: CentML/centml-python-client
Length of output: 2012
🌐 Web query:
websockets library version 13 vs 14 additional_headers extra_headers API change
💡 Result:
In the Python websockets library, the key API change between versions 13 and 14 regarding headers is in the client connection function (connect). Version 13 and earlier (legacy asyncio implementation) used the parameter name "extra_headers" for adding custom HTTP headers to the WebSocket opening handshake request. Starting with version 14.0 (November 9, 2024), which makes the new asyncio implementation the default, the parameter was renamed to "additional_headers". This change is part of the upgrade to the new asyncio implementation introduced in 13.0 but made default in 14.0. Code using "extra_headers" in v14+ will raise TypeError: BaseEventLoop.create_connection got an unexpected keyword argument 'extra_headers' because the new implementation passes kwargs directly to asyncio's create_connection, which does not accept "extra_headers". To fix, replace extra_headers=... with additional_headers=... . Example: Old (v13): websockets.connect(uri, extra_headers={"Authorization": "Bearer token"}) New (v14+): websockets.connect(uri, additional_headers={"Authorization": "Bearer token"}) The legacy implementation (websockets.legacy.client.connect) still supports "extra_headers" for backwards compatibility. Official docs confirm "additional_headers" in v14+ client API: HeadersLike|None – Arbitrary HTTP headers to add to the handshake request. Server-side changes exist (e.g., extra_headers -> process_response), but the query focuses on client headers. No other major header-related changes between 13 and 14.
Citations:
- 1: https://websockets.readthedocs.io/en/14.0/project/changelog.html
- 2: https://websockets.readthedocs.io/en/14.0/
- 3: extra_headers not compatible with v14.0 python-websockets/websockets#1588
- 4: https://websockets.readthedocs.io/en/13.0/howto/upgrade.html
- 5: https://websockets.readthedocs.io/en/15.0/reference/asyncio/client.html
- 6: https://websockets.readthedocs.io/en/13.0/faq/client.html
- 7: https://websockets.readthedocs.io/en/14.2/faq/client.html
- 8: https://websockets.readthedocs.io/en/14.0/reference/asyncio/client.html
Update websockets dependency or import the asyncio client explicitly.
Code at lines 162 and 208 calls websockets.connect() with the additional_headers parameter, which is incompatible with websockets 13.x. The library changed the parameter name from extra_headers (v13.x, legacy client) to additional_headers (v14.0+, new asyncio client), but requirements.txt allows websockets>=13.0, which includes incompatible versions. Either raise the minimum version to websockets>=14.0, or import connect from websockets.asyncio.client to use the new client API consistently across all allowed versions.
Suggested fix
import pyte
import websockets
+from websockets.asyncio.client import connect
@@
- async with websockets.connect(ws_url, additional_headers=headers, close_timeout=2) as ws:
+ async with connect(ws_url, additional_headers=headers, close_timeout=2) as ws:
@@
- async with websockets.connect(ws_url, additional_headers=headers, close_timeout=2) as ws:
+ async with connect(ws_url, additional_headers=headers, close_timeout=2) as ws:Also applies to: 162-162, 208-208
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@centml/sdk/shell/session.py` around lines 11 - 12, The code calls
websockets.connect(...) with additional_headers (in centml/sdk/shell/session.py
at the two call sites that currently use websockets.connect around lines 162 and
208) which is only supported by the new asyncio client; change the import to use
the asyncio client explicitly by importing connect from
websockets.asyncio.client and replace websockets.connect(...) calls with
connect(...), or alternatively raise the minimum requirement to websockets>=14.0
in requirements so the existing import "import websockets" remains valid—pick
one approach and apply it consistently to both call sites.
| exit_code = 0 | ||
| buffer = "" | ||
| is_capturing = False | ||
| is_done = False |
There was a problem hiding this comment.
Don't report success when the socket closes before END_MARKER.
exit_code starts at 0, and the ConnectionClosed path just falls through. If the pod or proxy disconnects before the trailer is parsed, centml cluster exec returns success even though the command outcome is unknown. Please treat “no end marker received” as a transport failure and return non-zero, or raise instead.
Suggested fix
- exit_code = 0
+ exit_code = None
@@
- except websockets.ConnectionClosed:
- pass
- return exit_code
+ except websockets.ConnectionClosed:
+ if not is_done:
+ sys.stderr.write("Error: terminal session closed before command completion\n")
+ return 1
+ return exit_code if is_done and exit_code is not None else 1Also applies to: 258-260
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@centml/sdk/shell/session.py` around lines 226 - 229, The code currently
initializes exit_code = 0 and silently falls through on ConnectionClosed,
causing a false "success" when END_MARKER wasn't received; update the
ConnectionClosed handling in centml/sdk/shell/session.py to treat a socket close
before detecting END_MARKER as a transport failure by either setting exit_code
to a non-zero value (e.g., 1) or raising an exception (e.g.,
RuntimeError("socket closed before END_MARKER")), and ensure this logic checks
the buffer/is_done/END_MARKER state so you only mark failure when the trailer
was not parsed; apply the same change to the second occurrence referenced
(around lines 258-260) so both code paths return non-zero or raise on missing
END_MARKER.
| @contextmanager | ||
| def _patch_deps(*, pods=None, tty=True): | ||
| """Patch common CLI dependencies used by ``_connect_args``. | ||
|
|
||
| If *pods* is provided, ``get_running_pods`` is also patched with that | ||
| return value. Yields a namespace exposing the mock objects that tests | ||
| most often assert against. | ||
| """ | ||
| with ExitStack() as stack: | ||
| e = stack.enter_context | ||
| e(patch("centml.cli.shell.get_centml_client", new_callable=_mock_client_ctx)) | ||
| if pods is not None: | ||
| e(patch("centml.cli.shell.get_running_pods", return_value=pods)) | ||
| ns = SimpleNamespace( | ||
| auth=e(patch("centml.cli.shell.auth")), | ||
| settings=e(patch("centml.cli.shell.settings")), | ||
| asyncio=e(patch("centml.cli.shell.asyncio")), | ||
| sys=e(patch("centml.cli.shell.sys")), | ||
| build_ws_url=e(patch("centml.cli.shell.build_ws_url")), | ||
| ) | ||
| ns.auth.get_centml_token.return_value = "token" | ||
| ns.settings.CENTML_PLATFORM_API_URL = "https://api.centml.com" | ||
| ns.asyncio.run.return_value = 0 | ||
| ns.sys.stdin.isatty.return_value = tty | ||
| ns.build_ws_url.return_value = "wss://test/ws" | ||
| yield ns |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
fd -t f "test_cli_shell.py" --exec wc -l {}Repository: CentML/centml-python-client
Length of output: 100
🏁 Script executed:
fd -t f "cli.*shell" -e py | head -20Repository: CentML/centml-python-client
Length of output: 94
🏁 Script executed:
git ls-files "tests/test_cli_shell.py" "centml/cli/shell.py" 2>/dev/null | head -10Repository: CentML/centml-python-client
Length of output: 114
🏁 Script executed:
cat -n tests/test_cli_shell.py | head -100Repository: CentML/centml-python-client
Length of output: 4331
🏁 Script executed:
cat -n centml/cli/shell.pyRepository: CentML/centml-python-client
Length of output: 4356
🏁 Script executed:
cat -n tests/test_cli_shell.py | sed -n '86,140p'Repository: CentML/centml-python-client
Length of output: 2677
🏁 Script executed:
grep -n "def test_" tests/test_cli_shell.py | head -20Repository: CentML/centml-python-client
Length of output: 832
🏁 Script executed:
grep -n "interactive_session\|exec_session" centml/cli/shell.pyRepository: CentML/centml-python-client
Length of output: 265
🏁 Script executed:
cat -n tests/test_cli_shell.py | sed -n '140,223p'Repository: CentML/centml-python-client
Length of output: 3990
🏁 Script executed:
grep -n "interactive_session\|exec_session" tests/test_cli_shell.pyRepository: CentML/centml-python-client
Length of output: 53
🏁 Script executed:
rg "interactive_session|exec_session" --type py -A 2Repository: CentML/centml-python-client
Length of output: 4420
🏁 Script executed:
python3 << 'EOF'
# Check if asyncio.run is called with a coroutine and what happens
import asyncio
from unittest.mock import MagicMock
async def example_coro():
return 42
# Simulate the test scenario
mock_run = MagicMock(return_value=0)
# This is what happens in the test - a coroutine is created
coro = example_coro()
# Then it's passed to the mocked asyncio.run
result = mock_run(coro)
print(f"Result from mocked run: {result}")
print(f"Coroutine object: {coro}")
print(f"Coroutine closed: {coro.cr_frame is None}")
# The coroutine is not closed/awaited
try:
coro.close()
print("Coroutine successfully closed after test")
except Exception as e:
print(f"Error closing: {e}")
EOFRepository: CentML/centml-python-client
Length of output: 229
🏁 Script executed:
grep -n "asyncio.run.call_args\|asyncio.run.assert_called_with" tests/test_cli_shell.pyRepository: CentML/centml-python-client
Length of output: 53
🏁 Script executed:
grep -n "interactive_session\|exec_session" centml/cli/shell.py | grep -v "^[0-9]*:.*import\|^[0-9]*:.*#"Repository: CentML/centml-python-client
Length of output: 265
🏁 Script executed:
grep -C 5 "asyncio.run.call_args\|call_args\[0\]" tests/test_cli_shell.pyRepository: CentML/centml-python-client
Length of output: 2781
🏁 Script executed:
python3 << 'EOF'
# Simulate what happens in the tests to understand the issue better
import asyncio
from unittest.mock import MagicMock, patch
from contextlib import contextmanager, ExitStack
# Create a simplified version of _patch_deps
`@contextmanager`
def _patch_deps_simple():
with ExitStack() as stack:
e = stack.enter_context
asyncio_mock = e(patch("asyncio.run"))
asyncio_mock.return_value = 0
yield asyncio_mock
# Simulate the CLI code
async def interactive_session(ws_url, token):
return 0
# Simulate the test scenario
print("=== Test Scenario ===")
with _patch_deps_simple() as mock_asyncio_run:
# This is what shell() does
ws_url = "wss://test/ws"
token = "token"
coro = interactive_session(ws_url, token)
print(f"1. Coroutine created: {coro}")
print(f"2. Coroutine closed before call: {coro.cr_frame is None}")
# Call the mocked asyncio.run
exit_code = mock_asyncio_run(coro)
print(f"3. Return value from mocked run: {exit_code}")
print(f"4. Coroutine closed after mocked run: {coro.cr_frame is None}")
print(f"5. What was passed to mock: {mock_asyncio_run.call_args}")
# Check if coroutine is still "unawaited"
try:
# Try to generate the warning
import warnings
warnings.simplefilter('always', RuntimeWarning)
del coro # Deleting without closing should trigger warning
except:
pass
EOFRepository: CentML/centml-python-client
Length of output: 470
🏁 Script executed:
cat -n tests/test_cli_shell.py | sed -n '90,105p'Repository: CentML/centml-python-client
Length of output: 800
🏁 Script executed:
grep -A 3 "m\.asyncio\.run\|m\.build_ws_url\|m\.auth\|m\.settings" tests/test_cli_shell.py | head -40Repository: CentML/centml-python-client
Length of output: 1367
Mock the session functions directly, not just asyncio.run.
The _patch_deps helper patches asyncio.run but not interactive_session or exec_session, so the CLI still constructs real coroutine objects when shell() and exec_cmd() call these functions. This causes un-awaited coroutine warnings at runtime, and the tests cannot assert the actual URL, shell type, or command forwarded to the SDK. Mock interactive_session and exec_session directly (or explicitly close the coroutine in a fake run) to both eliminate the warnings and enable assertions on their arguments.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/test_cli_shell.py` around lines 43 - 68, The test helper _patch_deps
currently only patches asyncio.run, causing real coroutine objects from
interactive_session and exec_session to be created (leading to un-awaited
coroutine warnings and preventing assertions); update _patch_deps to also patch
centml.cli.shell.interactive_session and centml.cli.shell.exec_session (or have
the patched asyncio.run explicitly await/close a fake coroutine) so tests can
inspect calls and avoid warnings; ensure the patched names (interactive_session,
exec_session) return a simple mock/coroutine that records the URL, shell type,
and command used by shell() and exec_cmd() for assertions.
There was a problem hiding this comment.
♻️ Duplicate comments (2)
centml/cli/shell.py (2)
41-60:⚠️ Potential issue | 🟠 MajorGuard against prompting in non-interactive contexts.
exec_cmdcan be invoked non-interactively (CI, scripts), but_connect_argsmay call_select_podwhich usesclick.prompt(). This will hang or fail when multiple pods exist and--pod/--first-podis not specified. Consider requiring--podor--first-podwhen stdin is not a TTY, or defaulting to first pod in non-interactive mode.🛠️ Proposed fix
def _connect_args(deployment_id, pod, shell_type, first_pod=False): """Resolve pod, build WebSocket URL, and obtain auth token.""" with get_centml_client() as cclient: running_pods = get_running_pods(cclient, deployment_id) if not running_pods: raise click.ClickException(f"No running pods found for deployment {deployment_id}") if pod is not None: try: pod_name = _resolve_pod(running_pods, pod) except ShellError as exc: raise click.ClickException(str(exc)) from exc elif len(running_pods) == 1 or first_pod: pod_name = running_pods[0] + elif not sys.stdin.isatty(): + raise click.ClickException( + "Multiple pods found. Specify --pod or --first-pod for non-interactive use." + ) else: pod_name = _select_pod(running_pods, deployment_id)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@centml/cli/shell.py` around lines 41 - 60, The _connect_args function can block by calling _select_pod (which uses click.prompt) in non-interactive runs; fix by detecting interactivity (e.g. using sys.stdin.isatty()) and if not interactive and pod is None and not first_pod and len(running_pods) > 1, do not call _select_pod: instead raise a click.ClickException telling the user to provide --pod or --first-pod (or alternatively choose to default to running_pods[0] if you prefer non-interactive default behavior); implement this check inside _connect_args before calling _select_pod so build_ws_url and auth.get_centml_token remain unchanged.
80-92:⚠️ Potential issue | 🟠 MajorUse
shlex.join()to preserve command argument boundaries.
" ".join(command)loses quoting and escaping. Arguments like"hello world",$HOME, or*will be misinterpreted by the pod shell. Useshlex.join(command)for correct POSIX shell escaping.🛠️ Proposed fix
+import shlex ... def exec_cmd(deployment_id, command, pod, shell_type, first_pod): ws_url, token = _connect_args(deployment_id, pod, shell_type, first_pod) - exit_code = asyncio.run(exec_session(ws_url, token, " ".join(command))) + exit_code = asyncio.run(exec_session(ws_url, token, shlex.join(command))) sys.exit(exit_code)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@centml/cli/shell.py` around lines 80 - 92, The pod exec command concatenates arguments with " ".join(command) which loses quoting/escaping; change exec_cmd to construct the shell string with shlex.join(command) before calling exec_session so argument boundaries are preserved (update the call in exec_cmd that currently does asyncio.run(exec_session(ws_url, token, " ".join(command))) to use shlex.join(command)); ensure you import shlex at top of centml/cli/shell.py if not already present.
🧹 Nitpick comments (4)
centml/sdk/shell/__init__.py (1)
4-12: Consider sorting__all__for consistency.Ruff (RUF022) flags that
__all__is not sorted. Sorting improves maintainability and aligns with isort-style conventions.♻️ Proposed fix
__all__ = [ - "ShellError", "NoPodAvailableError", "PodNotFoundError", + "ShellError", "build_ws_url", + "exec_session", "get_running_pods", "interactive_session", - "exec_session", ]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@centml/sdk/shell/__init__.py` around lines 4 - 12, The __all__ list is unsorted (RUF022); sort the exported names alphabetically to satisfy Ruff and maintain consistency — reorder the entries in __all__ so identifiers like "NoPodAvailableError", "PodNotFoundError", "ShellError", "build_ws_url", "exec_session", "get_running_pods", "interactive_session" appear in ascending lexicographic order (update the __all__ variable in centml/sdk/shell/__init__.py where it's defined).tests/test_sdk_shell_session.py (1)
315-323: Store the task reference to prevent potential garbage collection.The
asyncio.create_task()return value is not stored, which could theoretically allow the task to be garbage collected before completion. While this test likely works in practice, storing the reference is the recommended pattern.♻️ Suggested fix
async def _run(): async def _set_shutdown(): await asyncio.sleep(0.1) shutdown.set() - asyncio.create_task(_set_shutdown()) + _ = asyncio.create_task(_set_shutdown()) return await forward_io(ws, [80, 24], shutdown)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_sdk_shell_session.py` around lines 315 - 323, The create_task call in the local coroutine _run uses asyncio.create_task(_set_shutdown()) without storing its return value, which risks the task being garbage-collected; assign the created Task to a local variable (e.g., shutdown_task or setter_task) inside _run so the reference lives at least until forward_io(ws, [80, 24], shutdown) completes (optionally await or let it finish naturally), keeping function names _run and _set_shutdown unchanged.centml/sdk/shell/session.py (2)
127-136: Consider narrowing the exception catch during task cleanup.The
except (asyncio.CancelledError, Exception)catch is broad and silently suppresses all exceptions from cancelled tasks. While this is common in cleanup code, it could mask unexpected errors. Consider catching specific expected exceptions or at minimum logging unexpected ones.♻️ Suggested improvement
for t in pending: try: await t - except (asyncio.CancelledError, Exception): - pass + except asyncio.CancelledError: + pass # Expected for cancelled tasks + except websockets.ConnectionClosed: + pass # Expected during shutdown + except Exception: + pass # Suppress other errors during cleanup🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@centml/sdk/shell/session.py` around lines 127 - 136, The cleanup loop currently swallows all exceptions with "except (asyncio.CancelledError, Exception): pass", which can hide real errors; update the loop that awaits tasks in "pending" to handle asyncio.CancelledError separately and to catch Exception as "e" and log it (e.g. using the module/session logger) instead of silencing it, so reference the variables pending, done, and t and replace the broad except with a specific asyncio.CancelledError branch and an Exception branch that logs the unexpected exception details.
164-168: Unhandled exceptions from fire-and-forget coroutine.
asyncio.ensure_future(ws.send(...))schedules the send but doesn't handle potential exceptions. If the WebSocket is closed when a resize signal arrives, this could produce unhandled exception warnings.♻️ Suggested fix with exception handling
def _send_resize(): c, r = shutil.get_terminal_size() term_size[0], term_size[1] = c, r - asyncio.ensure_future(ws.send(json.dumps({"operation": "resize", "rows": r, "cols": c}))) + async def _do_resize(): + try: + await ws.send(json.dumps({"operation": "resize", "rows": r, "cols": c})) + except websockets.ConnectionClosed: + pass # Connection already closed, ignore resize + asyncio.ensure_future(_do_resize())🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@centml/sdk/shell/session.py` around lines 164 - 168, The _send_resize closure currently calls asyncio.ensure_future(ws.send(...)) which can raise unhandled exceptions if the websocket is closed; change this to schedule a coroutine that catches and logs or ignores exceptions from ws.send. Create an async helper (e.g., async def _safe_send_resize(rows, cols): try: await ws.send(...); except Exception as e: process or debug-log the error) and call asyncio.create_task(_safe_send_resize(r, c)) inside _send_resize so resize failures are handled gracefully; reference _send_resize, ws.send, term_size, and the existing asyncio scheduling call when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@centml/cli/shell.py`:
- Around line 41-60: The _connect_args function can block by calling _select_pod
(which uses click.prompt) in non-interactive runs; fix by detecting
interactivity (e.g. using sys.stdin.isatty()) and if not interactive and pod is
None and not first_pod and len(running_pods) > 1, do not call _select_pod:
instead raise a click.ClickException telling the user to provide --pod or
--first-pod (or alternatively choose to default to running_pods[0] if you prefer
non-interactive default behavior); implement this check inside _connect_args
before calling _select_pod so build_ws_url and auth.get_centml_token remain
unchanged.
- Around line 80-92: The pod exec command concatenates arguments with "
".join(command) which loses quoting/escaping; change exec_cmd to construct the
shell string with shlex.join(command) before calling exec_session so argument
boundaries are preserved (update the call in exec_cmd that currently does
asyncio.run(exec_session(ws_url, token, " ".join(command))) to use
shlex.join(command)); ensure you import shlex at top of centml/cli/shell.py if
not already present.
---
Nitpick comments:
In `@centml/sdk/shell/__init__.py`:
- Around line 4-12: The __all__ list is unsorted (RUF022); sort the exported
names alphabetically to satisfy Ruff and maintain consistency — reorder the
entries in __all__ so identifiers like "NoPodAvailableError",
"PodNotFoundError", "ShellError", "build_ws_url", "exec_session",
"get_running_pods", "interactive_session" appear in ascending lexicographic
order (update the __all__ variable in centml/sdk/shell/__init__.py where it's
defined).
In `@centml/sdk/shell/session.py`:
- Around line 127-136: The cleanup loop currently swallows all exceptions with
"except (asyncio.CancelledError, Exception): pass", which can hide real errors;
update the loop that awaits tasks in "pending" to handle asyncio.CancelledError
separately and to catch Exception as "e" and log it (e.g. using the
module/session logger) instead of silencing it, so reference the variables
pending, done, and t and replace the broad except with a specific
asyncio.CancelledError branch and an Exception branch that logs the unexpected
exception details.
- Around line 164-168: The _send_resize closure currently calls
asyncio.ensure_future(ws.send(...)) which can raise unhandled exceptions if the
websocket is closed; change this to schedule a coroutine that catches and logs
or ignores exceptions from ws.send. Create an async helper (e.g., async def
_safe_send_resize(rows, cols): try: await ws.send(...); except Exception as e:
process or debug-log the error) and call
asyncio.create_task(_safe_send_resize(r, c)) inside _send_resize so resize
failures are handled gracefully; reference _send_resize, ws.send, term_size, and
the existing asyncio scheduling call when making the change.
In `@tests/test_sdk_shell_session.py`:
- Around line 315-323: The create_task call in the local coroutine _run uses
asyncio.create_task(_set_shutdown()) without storing its return value, which
risks the task being garbage-collected; assign the created Task to a local
variable (e.g., shutdown_task or setter_task) inside _run so the reference lives
at least until forward_io(ws, [80, 24], shutdown) completes (optionally await or
let it finish naturally), keeping function names _run and _set_shutdown
unchanged.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 43be36f8-9df6-44f3-8f76-1d891c236df8
📒 Files selected for processing (11)
centml/cli/main.pycentml/cli/shell.pycentml/sdk/api.pycentml/sdk/shell/__init__.pycentml/sdk/shell/exceptions.pycentml/sdk/shell/session.pyrequirements.txttests/conftest.pytests/test_cli_shell.pytests/test_sdk_shell_renderer.pytests/test_sdk_shell_session.py
Summary
centml cluster shell <id>for interactive terminal sessions (likedocker exec -it)centml cluster exec <id> -- <command>for running commands and returning output (likessh host "cmd")get_status_v3()to SDK client for pod discoverywebsockets>=13.0dependencySummary by CodeRabbit
New Features
shellandexeccluster subcommands for interactive and non-interactive pod terminal access.get_status_v3()API method for deployment status retrieval.Chores
Tests