Skip to content

add codesys hde2e demo: Dockerfile, PLC data bundles, configs, hde2e …#6

Open
mohitmeh12 wants to merge 1 commit intomainfrom
mm/codesys-hde2e
Open

add codesys hde2e demo: Dockerfile, PLC data bundles, configs, hde2e …#6
mohitmeh12 wants to merge 1 commit intomainfrom
mm/codesys-hde2e

Conversation

@mohitmeh12
Copy link
Collaborator

…orchestrator, and sudo/mount fixes

Description of the change

Detailed description here

Checks and balances

  • Tests added and run
  • (External) Interfaces documented
  • Has security implications (describe below)

Type of change

  • Bug fix (non-breaking change that fixes an issue)
  • New feature (non-breaking change that adds functionality)
  • Breaking change (fix or feature that would cause existing
    functionality to not work as expected)

Related stories, issues and pulls

Links to Pivotal Tracker stories, JIRA tickets or other pull requests here

Security considerations

Describe any security impact this change may have

def _run_ssh_command(ssh: paramiko.SSHClient, command: str,
timeout: int = 120) -> int:
"""Run a command on a remote host via SSH. Returns exit code."""
print(f"[SSH] {command}")

Check failure

Code scanning / CodeQL

Clear-text logging of sensitive information High

This expression logs
sensitive data (password)
as clear text.
This expression logs
sensitive data (password)
as clear text.
This expression logs
sensitive data (password)
as clear text.
This expression logs
sensitive data (password)
as clear text.

Copilot Autofix

AI 1 day ago

In general, commands that contain sensitive data such as passwords must not be logged verbatim. Either avoid including the secret in the command altogether (preferred), or redact/omit sensitive portions before logging. Also, piping passwords via shell command strings (printf ... | sudo -S) is fragile; using sudo’s standard input handling directly is safer and avoids embedding the password in the command text.

The best fix here is to change _sudo_cmd from returning a shell string that inlines the password to returning a tuple describing a safe command invocation: the command itself plus the password to be provided on stdin. Then, adjust the call sites that run commands to use subprocess.run/paramiko in a way that passes the password via stdin instead of embedding it in the command string. Specifically:

  • Change _sudo_cmd to return (sudo_command: str, password: str) rather than a single formatted string. The sudo_command will be something like "sudo -S <original cmd>" without the password inside it.
  • Update the local _run helper in the shim‑interface‑creation code so that when ssh is present it calls _run_ssh_command with two arguments: the sudo_command and the password. For local (non‑SSH) execution, use subprocess.run with input=password + "\n" and text=True to supply the password over stdin without logging it.
  • Update _run_ssh_command to accept an optional password parameter. When logging, only log the command without the password. When executing, call ssh.exec_command(command, timeout=timeout) and then write the password followed by a newline to _stdin if a password is provided. Crucially, the printed log line remains "[SSH] {command}", which no longer contains the secret.

These changes are all within src/hde2e.py in the shown regions: the _sudo_cmd definition (lines 62–68), _run_ssh_command (lines 99–112), and the _run helper and uses of sudo_cmd near lines 244–252.

Suggested changeset 1
src/hde2e.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/src/hde2e.py b/src/hde2e.py
--- a/src/hde2e.py
+++ b/src/hde2e.py
@@ -59,13 +59,17 @@
         self._sudo_password = pw
         return pw
 
-    def _sudo_cmd(self, cmd: str) -> str:
-        """Wrap *cmd* so it runs under ``sudo -S`` with the password
-        piped in.  This avoids interactive prompts that block automation.
+    def _sudo_cmd(self, cmd: str) -> (str, str):
+        """Return a tuple ``(sudo_command, password)`` for running *cmd*
+        under ``sudo -S``.
+
+        The password is returned separately so that caller code can pass it
+        via stdin instead of embedding it in a logged command string.
         """
         pw = self._resolve_sudo_password()
-        # Use printf to avoid issues with special chars in the password.
-        return f"printf '%s\\n' '{pw}' | sudo -S {cmd}"
+        sudo_command = f"sudo -S {cmd}"
+        return sudo_command, pw
+
     def _realtime_core_control(self, instance_num: str) -> Optional[str]:
         """Return a CPU core list string for the real-time Control PLC instance."""
         # Example mapping: Control_PLC_01 → cores 0-3, Control_PLC_02 → cores 4-7
@@ -97,11 +99,20 @@
 
     @staticmethod
     def _run_ssh_command(ssh: paramiko.SSHClient, command: str,
-                         timeout: int = 120) -> int:
-        """Run a command on a remote host via SSH. Returns exit code."""
+                         timeout: int = 120,
+                         password: Optional[str] = None) -> int:
+        """Run a command on a remote host via SSH. Returns exit code.
+
+        If *password* is provided, it is written to the command's stdin
+        (for example, to satisfy ``sudo -S``) but is never included in
+        the logged command string.
+        """
         print(f"[SSH] {command}")
         try:
             _stdin, stdout, stderr = ssh.exec_command(command, timeout=timeout)
+            if password is not None:
+                _stdin.write(password + "\n")
+                _stdin.flush()
             exit_status = stdout.channel.recv_exit_status()
             out = stdout.read().decode().strip()
             err = stderr.read().decode().strip()
@@ -244,12 +252,21 @@
         sudo_cmd = self._sudo_cmd
 
         def _run(cmd: str) -> int:
+            sudo_command, password = sudo_cmd(cmd)
             if ssh:
-                return DockerHDE2E._run_ssh_command(ssh, cmd)
-            return subprocess.run(cmd, shell=True, check=False).returncode
+                return DockerHDE2E._run_ssh_command(ssh, sudo_command, password=password)
+            # Local execution: pass password via stdin to sudo without logging it.
+            result = subprocess.run(
+                sudo_command,
+                shell=True,
+                check=False,
+                input=password + "\n",
+                text=True,
+            )
+            return result.returncode
 
         # Idempotent — skip if the interface already exists
-        if _run(sudo_cmd(f"ip link show {shim_name} 2>/dev/null")) == 0:
+        if _run(f"ip link show {shim_name} 2>/dev/null") == 0:
             print(f"[INFO] Shim interface '{shim_name}' already exists.")
             return 0
 
EOF
Copilot is powered by AI and may make mistakes. Always verify output.
port: int = 22) -> paramiko.SSHClient:
"""Open an SSH connection and return the client."""
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

Check failure

Code scanning / CodeQL

Accepting unknown SSH host keys when using Paramiko High

Setting missing host key policy to AutoAddPolicy may be unsafe.

Copilot Autofix

AI 1 day ago

In general, the problem is fixed by not auto-accepting unknown SSH host keys. Instead, rely on Paramiko’s default RejectPolicy or explicitly set it, and ensure that the remote host’s key is known via a known_hosts file or explicit host key configuration. On connection, if the host key is unknown or mismatched, Paramiko will raise an exception, preventing potentially compromised sessions.

For this specific code, the minimal, functionality-preserving change is to stop using AutoAddPolicy and instead use RejectPolicy. This maintains almost all existing behavior (same API, same connection logic) but causes unknown/mismatched host keys to fail rather than be silently trusted. We only need to adjust _ssh_connect in src/hde2e.py:

  • Keep creating paramiko.SSHClient() as before.
  • Replace ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) with ssh.set_missing_host_key_policy(paramiko.RejectPolicy()).
  • No import changes are needed because the code already imports paramiko at the module level and accesses policies as attributes of that module.

Concretely, in src/hde2e.py around line 121–124, we will modify the set_missing_host_key_policy call accordingly and leave the rest of _ssh_connect unchanged.

Suggested changeset 1
src/hde2e.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/src/hde2e.py b/src/hde2e.py
--- a/src/hde2e.py
+++ b/src/hde2e.py
@@ -119,7 +119,7 @@
                      port: int = 22) -> paramiko.SSHClient:
         """Open an SSH connection and return the client."""
         ssh = paramiko.SSHClient()
-        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+        ssh.set_missing_host_key_policy(paramiko.RejectPolicy())
         print(f"[SSH] Connecting to {user}@{host}:{port} …")
         ssh.connect(hostname=host, username=user, password=password, port=port)
         return ssh
EOF
@@ -119,7 +119,7 @@
port: int = 22) -> paramiko.SSHClient:
"""Open an SSH connection and return the client."""
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.set_missing_host_key_policy(paramiko.RejectPolicy())
print(f"[SSH] Connecting to {user}@{host}:{port} …")
ssh.connect(hostname=host, username=user, password=password, port=port)
return ssh
Copilot is powered by AI and may make mistakes. Always verify output.
]
for cmd in cmds:
if _run(cmd) != 0:
print(f"[ERROR] Failed to create macvlan shim: {cmd}")

Check failure

Code scanning / CodeQL

Clear-text logging of sensitive information High

This expression logs
sensitive data (password)
as clear text.
This expression logs
sensitive data (password)
as clear text.
This expression logs
sensitive data (password)
as clear text.
This expression logs
sensitive data (password)
as clear text.

Copilot Autofix

AI 1 day ago

In general, the issue should be fixed by ensuring that any log messages do not contain secrets such as passwords or other credentials. When you must log an operation that involves sensitive data, log only non-sensitive details (e.g., the original command without the injected password, or a redacted version) and/or generic error text.

In this specific case, the best fix is to avoid including the output of _sudo_cmd (which embeds the password) in logs. Instead, the code can log the underlying command that would be run with sudo (without the password), or log only the shim name / operation. To keep behaviour unchanged, we should keep using _sudo_cmd to build the password-piped command for execution, but adjust the error logging to either (a) omit cmd entirely, or (b) log a non-sensitive representation of the action. A simple, low-risk fix is to change the error print from Failed to create macvlan shim: {cmd} to something like Failed to create macvlan shim '{shim_name}' (step: ip link add) and, if we still want to show the original non-sudo command, we can reconstruct or pre-store it before wrapping with _sudo_cmd.

Given the minimal snippet, the smallest safe change is just to stop logging cmd in the loop in _macvlan_shim_create. That requires editing the print call at line 265 to remove the interpolated cmd value. No new imports or helper methods are strictly necessary. If we want to maintain some debugging value, we can add a static description or index of the step, which is not sensitive.

Suggested changeset 1
src/hde2e.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/src/hde2e.py b/src/hde2e.py
--- a/src/hde2e.py
+++ b/src/hde2e.py
@@ -262,7 +262,7 @@
         ]
         for cmd in cmds:
             if _run(cmd) != 0:
-                print(f"[ERROR] Failed to create macvlan shim: {cmd}")
+                print(f"[ERROR] Failed to create macvlan shim '{shim_name}'.")
                 return 1
 
         print(f"[INFO] Macvlan shim '{shim_name}' up at {shim_ip}")
EOF
@@ -262,7 +262,7 @@
]
for cmd in cmds:
if _run(cmd) != 0:
print(f"[ERROR] Failed to create macvlan shim: {cmd}")
print(f"[ERROR] Failed to create macvlan shim '{shim_name}'.")
return 1

print(f"[INFO] Macvlan shim '{shim_name}' up at {shim_ip}")
Copilot is powered by AI and may make mistakes. Always verify output.
f"rm -f {remote_bundle}",
]:
if self._run_ssh_command(ssh, cmd) != 0:
print(f"[ERROR] Failed: {cmd}")

Check failure

Code scanning / CodeQL

Clear-text logging of sensitive information High

This expression logs
sensitive data (password)
as clear text.
This expression logs
sensitive data (password)
as clear text.
This expression logs
sensitive data (password)
as clear text.
This expression logs
sensitive data (password)
as clear text.

Copilot Autofix

AI 1 day ago

In general, to fix clear-text logging of sensitive information, you must ensure that any values possibly containing secrets (passwords, tokens, keys) are never included verbatim in log messages. Instead, either (a) omit them entirely, (b) replace them with redacted placeholders, or (c) log only non-sensitive derived data (e.g., command names or IDs rather than full command strings).

For this specific code, the safest fix without changing overall behavior is to avoid logging the full cmd string when a command fails, because cmd can contain the sudo password. We still want useful diagnostics, so we can log a sanitized description—e.g., the base command without the password, or a static message. Since we must not assume additional context from other files, the minimal and robust change is to remove cmd from the error message in _transfer_and_extract_bundle and replace it with a generic message stating that a remote command failed. This ensures no password-bearing string is printed while preserving the control flow and return behavior.

Concretely:

  • In src/hde2e.py, within _transfer_and_extract_bundle, change the print(f"[ERROR] Failed: {cmd}") line to avoid interpolating cmd. For instance, print [ERROR] Failed to execute remote command or similar.
  • No new imports or helper methods are required; we only modify that single print call.
  • This single change addresses all alert variants, since they all trace to the same sink (print with cmd).

Suggested changeset 1
src/hde2e.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/src/hde2e.py b/src/hde2e.py
--- a/src/hde2e.py
+++ b/src/hde2e.py
@@ -486,7 +486,7 @@
             f"rm -f {remote_bundle}",
         ]:
             if self._run_ssh_command(ssh, cmd) != 0:
-                print(f"[ERROR] Failed: {cmd}")
+                print("[ERROR] Failed to execute remote command during bundle transfer")
                 return 1
         return 0
 
EOF
@@ -486,7 +486,7 @@
f"rm -f {remote_bundle}",
]:
if self._run_ssh_command(ssh, cmd) != 0:
print(f"[ERROR] Failed: {cmd}")
print("[ERROR] Failed to execute remote command during bundle transfer")
return 1
return 0

Copilot is powered by AI and may make mistakes. Always verify output.
raise ValueError(
f"Unsafe path detected in tar archive entry: {member.name!r}"
)
tar_obj.extractall(path)

Check failure

Code scanning / CodeQL

Arbitrary file write during tarfile extraction High

This file extraction depends on a
potentially untrusted source
.

Copilot Autofix

AI 1 day ago

In general, to fix this kind of problem, we must ensure that every path derived from the tar archive is validated before writing to disk and avoid passing the untrusted tar object directly to extractall. Instead, iterate through members, validate each member.name (rejecting absolute paths and ".." components) and then extract only those safe members, using a controlled target path.

For this snippet, the best fix while preserving behavior is to replace the custom _is_within_directory / _safe_extract plus extractall call with a standard, conservative safe-extraction routine that (1) rejects absolute paths and paths containing "..", and (2) extracts each member individually via tar.extract(member, path=mount_base). This keeps semantics close to the original extractall(mount_base) but ensures no member can escape mount_base. We also no longer need _is_within_directory, simplifying the code. Concretely, within src/hde2e.py at the with tarfile.open(bundle, "r:gz") as tar: block (around lines 593–613), we will remove _is_within_directory and the extractall call and instead introduce a _safe_extract that validates member.name using os.path.isabs and checking for ".." path components, then calls tar_obj.extract(member, path=path) only for safe members.

Suggested changeset 1
src/hde2e.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/src/hde2e.py b/src/hde2e.py
--- a/src/hde2e.py
+++ b/src/hde2e.py
@@ -594,21 +594,15 @@
                 # Safely extract the tar archive, ensuring all members stay
                 # within the intended mount_base directory to avoid
                 # directory traversal and arbitrary file writes.
-                def _is_within_directory(base_dir: str, target_path: str) -> bool:
-                    base_dir_abs = os.path.abspath(base_dir)
-                    target_abs = os.path.abspath(target_path)
-                    return os.path.commonpath([base_dir_abs]) == os.path.commonpath(
-                        [base_dir_abs, target_abs]
-                    )
-
                 def _safe_extract(tar_obj: tarfile.TarFile, path: str) -> None:
                     for member in tar_obj.getmembers():
-                        member_path = os.path.join(path, member.name)
-                        if not _is_within_directory(path, member_path):
+                        member_name = member.name
+                        # Reject absolute paths and any path containing ".."
+                        if os.path.isabs(member_name) or ".." in member_name.split("/"):
                             raise ValueError(
-                                f"Unsafe path detected in tar archive entry: {member.name!r}"
+                                f"Unsafe path detected in tar archive entry: {member_name!r}"
                             )
-                    tar_obj.extractall(path)
+                        tar_obj.extract(member, path=path)
 
                 _safe_extract(tar, mount_base)
 
EOF
@@ -594,21 +594,15 @@
# Safely extract the tar archive, ensuring all members stay
# within the intended mount_base directory to avoid
# directory traversal and arbitrary file writes.
def _is_within_directory(base_dir: str, target_path: str) -> bool:
base_dir_abs = os.path.abspath(base_dir)
target_abs = os.path.abspath(target_path)
return os.path.commonpath([base_dir_abs]) == os.path.commonpath(
[base_dir_abs, target_abs]
)

def _safe_extract(tar_obj: tarfile.TarFile, path: str) -> None:
for member in tar_obj.getmembers():
member_path = os.path.join(path, member.name)
if not _is_within_directory(path, member_path):
member_name = member.name
# Reject absolute paths and any path containing ".."
if os.path.isabs(member_name) or ".." in member_name.split("/"):
raise ValueError(
f"Unsafe path detected in tar archive entry: {member.name!r}"
f"Unsafe path detected in tar archive entry: {member_name!r}"
)
tar_obj.extractall(path)
tar_obj.extract(member, path=path)

_safe_extract(tar, mount_base)

Copilot is powered by AI and may make mistakes. Always verify output.
@uncleDecart
Copy link
Member

Hey @mohitmeh12, so in order to test it, I need to switch to demo mode in config, and run it as sudo, i.e. sudo .venv/bin/python3 main.py, right?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants