add codesys hde2e demo: Dockerfile, PLC data bundles, configs, hde2e …#6
add codesys hde2e demo: Dockerfile, PLC data bundles, configs, hde2e …#6mohitmeh12 wants to merge 1 commit intomainfrom
Conversation
| def _run_ssh_command(ssh: paramiko.SSHClient, command: str, | ||
| timeout: int = 120) -> int: | ||
| """Run a command on a remote host via SSH. Returns exit code.""" | ||
| print(f"[SSH] {command}") |
Check failure
Code scanning / CodeQL
Clear-text logging of sensitive information High
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 1 day ago
In general, commands that contain sensitive data such as passwords must not be logged verbatim. Either avoid including the secret in the command altogether (preferred), or redact/omit sensitive portions before logging. Also, piping passwords via shell command strings (printf ... | sudo -S) is fragile; using sudo’s standard input handling directly is safer and avoids embedding the password in the command text.
The best fix here is to change _sudo_cmd from returning a shell string that inlines the password to returning a tuple describing a safe command invocation: the command itself plus the password to be provided on stdin. Then, adjust the call sites that run commands to use subprocess.run/paramiko in a way that passes the password via stdin instead of embedding it in the command string. Specifically:
- Change
_sudo_cmdto return(sudo_command: str, password: str)rather than a single formatted string. Thesudo_commandwill be something like"sudo -S <original cmd>"without the password inside it. - Update the local
_runhelper in the shim‑interface‑creation code so that whensshis present it calls_run_ssh_commandwith two arguments: thesudo_commandand the password. For local (non‑SSH) execution, usesubprocess.runwithinput=password + "\n"andtext=Trueto supply the password over stdin without logging it. - Update
_run_ssh_commandto accept an optionalpasswordparameter. When logging, only log the command without the password. When executing, callssh.exec_command(command, timeout=timeout)and then write the password followed by a newline to_stdinif a password is provided. Crucially, the printed log line remains"[SSH] {command}", which no longer contains the secret.
These changes are all within src/hde2e.py in the shown regions: the _sudo_cmd definition (lines 62–68), _run_ssh_command (lines 99–112), and the _run helper and uses of sudo_cmd near lines 244–252.
| port: int = 22) -> paramiko.SSHClient: | ||
| """Open an SSH connection and return the client.""" | ||
| ssh = paramiko.SSHClient() | ||
| ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) |
Check failure
Code scanning / CodeQL
Accepting unknown SSH host keys when using Paramiko High
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 1 day ago
In general, the problem is fixed by not auto-accepting unknown SSH host keys. Instead, rely on Paramiko’s default RejectPolicy or explicitly set it, and ensure that the remote host’s key is known via a known_hosts file or explicit host key configuration. On connection, if the host key is unknown or mismatched, Paramiko will raise an exception, preventing potentially compromised sessions.
For this specific code, the minimal, functionality-preserving change is to stop using AutoAddPolicy and instead use RejectPolicy. This maintains almost all existing behavior (same API, same connection logic) but causes unknown/mismatched host keys to fail rather than be silently trusted. We only need to adjust _ssh_connect in src/hde2e.py:
- Keep creating
paramiko.SSHClient()as before. - Replace
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())withssh.set_missing_host_key_policy(paramiko.RejectPolicy()). - No import changes are needed because the code already imports
paramikoat the module level and accesses policies as attributes of that module.
Concretely, in src/hde2e.py around line 121–124, we will modify the set_missing_host_key_policy call accordingly and leave the rest of _ssh_connect unchanged.
| @@ -119,7 +119,7 @@ | ||
| port: int = 22) -> paramiko.SSHClient: | ||
| """Open an SSH connection and return the client.""" | ||
| ssh = paramiko.SSHClient() | ||
| ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) | ||
| ssh.set_missing_host_key_policy(paramiko.RejectPolicy()) | ||
| print(f"[SSH] Connecting to {user}@{host}:{port} …") | ||
| ssh.connect(hostname=host, username=user, password=password, port=port) | ||
| return ssh |
| ] | ||
| for cmd in cmds: | ||
| if _run(cmd) != 0: | ||
| print(f"[ERROR] Failed to create macvlan shim: {cmd}") |
Check failure
Code scanning / CodeQL
Clear-text logging of sensitive information High
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 1 day ago
In general, the issue should be fixed by ensuring that any log messages do not contain secrets such as passwords or other credentials. When you must log an operation that involves sensitive data, log only non-sensitive details (e.g., the original command without the injected password, or a redacted version) and/or generic error text.
In this specific case, the best fix is to avoid including the output of _sudo_cmd (which embeds the password) in logs. Instead, the code can log the underlying command that would be run with sudo (without the password), or log only the shim name / operation. To keep behaviour unchanged, we should keep using _sudo_cmd to build the password-piped command for execution, but adjust the error logging to either (a) omit cmd entirely, or (b) log a non-sensitive representation of the action. A simple, low-risk fix is to change the error print from Failed to create macvlan shim: {cmd} to something like Failed to create macvlan shim '{shim_name}' (step: ip link add) and, if we still want to show the original non-sudo command, we can reconstruct or pre-store it before wrapping with _sudo_cmd.
Given the minimal snippet, the smallest safe change is just to stop logging cmd in the loop in _macvlan_shim_create. That requires editing the print call at line 265 to remove the interpolated cmd value. No new imports or helper methods are strictly necessary. If we want to maintain some debugging value, we can add a static description or index of the step, which is not sensitive.
| @@ -262,7 +262,7 @@ | ||
| ] | ||
| for cmd in cmds: | ||
| if _run(cmd) != 0: | ||
| print(f"[ERROR] Failed to create macvlan shim: {cmd}") | ||
| print(f"[ERROR] Failed to create macvlan shim '{shim_name}'.") | ||
| return 1 | ||
|
|
||
| print(f"[INFO] Macvlan shim '{shim_name}' up at {shim_ip}") |
| f"rm -f {remote_bundle}", | ||
| ]: | ||
| if self._run_ssh_command(ssh, cmd) != 0: | ||
| print(f"[ERROR] Failed: {cmd}") |
Check failure
Code scanning / CodeQL
Clear-text logging of sensitive information High
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 1 day ago
In general, to fix clear-text logging of sensitive information, you must ensure that any values possibly containing secrets (passwords, tokens, keys) are never included verbatim in log messages. Instead, either (a) omit them entirely, (b) replace them with redacted placeholders, or (c) log only non-sensitive derived data (e.g., command names or IDs rather than full command strings).
For this specific code, the safest fix without changing overall behavior is to avoid logging the full cmd string when a command fails, because cmd can contain the sudo password. We still want useful diagnostics, so we can log a sanitized description—e.g., the base command without the password, or a static message. Since we must not assume additional context from other files, the minimal and robust change is to remove cmd from the error message in _transfer_and_extract_bundle and replace it with a generic message stating that a remote command failed. This ensures no password-bearing string is printed while preserving the control flow and return behavior.
Concretely:
- In
src/hde2e.py, within_transfer_and_extract_bundle, change theprint(f"[ERROR] Failed: {cmd}")line to avoid interpolatingcmd. For instance, print[ERROR] Failed to execute remote commandor similar. - No new imports or helper methods are required; we only modify that single
printcall. - This single change addresses all alert variants, since they all trace to the same sink (
printwithcmd).
| @@ -486,7 +486,7 @@ | ||
| f"rm -f {remote_bundle}", | ||
| ]: | ||
| if self._run_ssh_command(ssh, cmd) != 0: | ||
| print(f"[ERROR] Failed: {cmd}") | ||
| print("[ERROR] Failed to execute remote command during bundle transfer") | ||
| return 1 | ||
| return 0 | ||
|
|
| raise ValueError( | ||
| f"Unsafe path detected in tar archive entry: {member.name!r}" | ||
| ) | ||
| tar_obj.extractall(path) |
Check failure
Code scanning / CodeQL
Arbitrary file write during tarfile extraction High
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 1 day ago
In general, to fix this kind of problem, we must ensure that every path derived from the tar archive is validated before writing to disk and avoid passing the untrusted tar object directly to extractall. Instead, iterate through members, validate each member.name (rejecting absolute paths and ".." components) and then extract only those safe members, using a controlled target path.
For this snippet, the best fix while preserving behavior is to replace the custom _is_within_directory / _safe_extract plus extractall call with a standard, conservative safe-extraction routine that (1) rejects absolute paths and paths containing "..", and (2) extracts each member individually via tar.extract(member, path=mount_base). This keeps semantics close to the original extractall(mount_base) but ensures no member can escape mount_base. We also no longer need _is_within_directory, simplifying the code. Concretely, within src/hde2e.py at the with tarfile.open(bundle, "r:gz") as tar: block (around lines 593–613), we will remove _is_within_directory and the extractall call and instead introduce a _safe_extract that validates member.name using os.path.isabs and checking for ".." path components, then calls tar_obj.extract(member, path=path) only for safe members.
| @@ -594,21 +594,15 @@ | ||
| # Safely extract the tar archive, ensuring all members stay | ||
| # within the intended mount_base directory to avoid | ||
| # directory traversal and arbitrary file writes. | ||
| def _is_within_directory(base_dir: str, target_path: str) -> bool: | ||
| base_dir_abs = os.path.abspath(base_dir) | ||
| target_abs = os.path.abspath(target_path) | ||
| return os.path.commonpath([base_dir_abs]) == os.path.commonpath( | ||
| [base_dir_abs, target_abs] | ||
| ) | ||
|
|
||
| def _safe_extract(tar_obj: tarfile.TarFile, path: str) -> None: | ||
| for member in tar_obj.getmembers(): | ||
| member_path = os.path.join(path, member.name) | ||
| if not _is_within_directory(path, member_path): | ||
| member_name = member.name | ||
| # Reject absolute paths and any path containing ".." | ||
| if os.path.isabs(member_name) or ".." in member_name.split("/"): | ||
| raise ValueError( | ||
| f"Unsafe path detected in tar archive entry: {member.name!r}" | ||
| f"Unsafe path detected in tar archive entry: {member_name!r}" | ||
| ) | ||
| tar_obj.extractall(path) | ||
| tar_obj.extract(member, path=path) | ||
|
|
||
| _safe_extract(tar, mount_base) | ||
|
|
…orchestrator, and sudo/mount fixes
82d86c6 to
c6b57a4
Compare
|
Hey @mohitmeh12, so in order to test it, I need to switch to demo mode in config, and run it as sudo, i.e. |
…orchestrator, and sudo/mount fixes
Description of the change
Checks and balances
Type of change
functionality to not work as expected)
Related stories, issues and pulls
Security considerations