From 82a6585349717c27c8dda4cafa62ea9980d736f1 Mon Sep 17 00:00:00 2001 From: Christian Berendt Date: Sun, 15 Mar 2026 19:22:45 +0100 Subject: [PATCH] Fix known_hosts race condition and command injection in console subprocess calls Use per-invocation known_hosts files for clush to avoid race conditions with concurrent SSH connections. Replace shell=True subprocess calls with list form to prevent command injection, and apply shlex.quote() to user-controlled values in remote shell commands. AI-assisted: Claude Code Signed-off-by: Christian Berendt --- osism/commands/console.py | 85 ++++++++++++++++++++++++++++++++------- 1 file changed, 70 insertions(+), 15 deletions(-) diff --git a/osism/commands/console.py b/osism/commands/console.py index c3d03170..8d5a99ed 100644 --- a/osism/commands/console.py +++ b/osism/commands/console.py @@ -1,8 +1,12 @@ # SPDX-License-Identifier: Apache-2.0 import json +import os +import shlex +import shutil import socket import subprocess +import tempfile from typing import Optional from cliff.command import Command @@ -196,15 +200,40 @@ def take_action(self, parsed_args): type_console = "clush" host = host[1:] - ssh_options = f"-o StrictHostKeyChecking=no -o LogLevel=ERROR -o UserKnownHostsFile={KNOWN_HOSTS_PATH}" + ssh_options = [ + "-o", + "StrictHostKeyChecking=no", + "-o", + "LogLevel=ERROR", + "-o", + f"UserKnownHostsFile={KNOWN_HOSTS_PATH}", + ] if type_console == "ansible": - subprocess.call(f"/run-ansible-console.sh {host}", shell=True) + subprocess.call(["/run-ansible-console.sh", host]) elif type_console == "clush": - subprocess.call( - f"/usr/local/bin/clush -l {settings.OPERATOR_USER} -g {host}", - shell=True, - ) + # Create a per-invocation known_hosts file to avoid race conditions + # with fanout:64 concurrent SSH connections while still persisting + # host keys during the session. + fd, tmp_known_hosts = tempfile.mkstemp(prefix="clush_known_hosts_") + try: + os.close(fd) + if os.path.exists(KNOWN_HOSTS_PATH): + shutil.copy2(KNOWN_HOSTS_PATH, tmp_known_hosts) + subprocess.call( + [ + "/usr/local/bin/clush", + "-l", + settings.OPERATOR_USER, + "-o", + f"-o UserKnownHostsFile={tmp_known_hosts}", + "-g", + host, + ] + ) + finally: + if os.path.exists(tmp_known_hosts): + os.unlink(tmp_known_hosts) elif type_console == "ssh": # Try to resolve as an inventory group group_hosts = get_hosts_from_group(host) @@ -221,8 +250,13 @@ def take_action(self, parsed_args): resolved_host = resolve_host_with_fallback(host) # FIXME: use paramiko or something else more Pythonic + make operator user + key configurable subprocess.call( - f"/usr/bin/ssh -i /ansible/secrets/id_rsa.operator {ssh_options} {settings.OPERATOR_USER}@{resolved_host}", - shell=True, + [ + "/usr/bin/ssh", + "-i", + "/ansible/secrets/id_rsa.operator", + *ssh_options, + f"{settings.OPERATOR_USER}@{resolved_host}", + ] ) elif type_console == "container_prompt": while True: @@ -230,26 +264,47 @@ def take_action(self, parsed_args): if command in ["Exit", "exit", "EXIT"]: break - ssh_command = f"docker {command}" + ssh_command = f"docker {shlex.quote(command)}" # Resolve hostname with Netbox fallback resolved_host = resolve_host_with_fallback(host[:-1]) # FIXME: use paramiko or something else more Pythonic + make operator user + key configurable subprocess.call( - f"/usr/bin/ssh -i /ansible/secrets/id_rsa.operator {ssh_options} {settings.OPERATOR_USER}@{resolved_host} {ssh_command}", - shell=True, + [ + "/usr/bin/ssh", + "-i", + "/ansible/secrets/id_rsa.operator", + *ssh_options, + f"{settings.OPERATOR_USER}@{resolved_host}", + ssh_command, + ] ) elif type_console == "container": target_containername = host.split("/")[1] target_host = host.split("/")[0] target_command = "bash" - ssh_command = f"docker exec -it {target_containername} {target_command}" - ssh_options = f"-o RequestTTY=force -o StrictHostKeyChecking=no -o LogLevel=ERROR -o UserKnownHostsFile={KNOWN_HOSTS_PATH}" + ssh_command = f"docker exec -it {shlex.quote(target_containername)} {shlex.quote(target_command)}" + ssh_options = [ + "-o", + "RequestTTY=force", + "-o", + "StrictHostKeyChecking=no", + "-o", + "LogLevel=ERROR", + "-o", + f"UserKnownHostsFile={KNOWN_HOSTS_PATH}", + ] # Resolve hostname with Netbox fallback resolved_target_host = resolve_host_with_fallback(target_host) # FIXME: use paramiko or something else more Pythonic + make operator user + key configurable subprocess.call( - f"/usr/bin/ssh -i /ansible/secrets/id_rsa.operator {ssh_options} {settings.OPERATOR_USER}@{resolved_target_host} {ssh_command}", - shell=True, + [ + "/usr/bin/ssh", + "-i", + "/ansible/secrets/id_rsa.operator", + *ssh_options, + f"{settings.OPERATOR_USER}@{resolved_target_host}", + ssh_command, + ] )