From e6c8e4f1a9d20c15a9fcc00eafe52833ce9b032d Mon Sep 17 00:00:00 2001 From: Marcin Usielski Date: Thu, 8 Jan 2026 11:12:39 +0100 Subject: [PATCH 01/20] type annotations in IOConnection --- moler/io/io_connection.py | 66 +++++++++++++++++++-------------------- 1 file changed, 32 insertions(+), 34 deletions(-) diff --git a/moler/io/io_connection.py b/moler/io/io_connection.py index 8f9f4e13f..bcc5c3776 100644 --- a/moler/io/io_connection.py +++ b/moler/io/io_connection.py @@ -29,18 +29,20 @@ """ __author__ = "Grzegorz Latuszek, Marcin Usielski" -__copyright__ = "Copyright (C) 2018-2022, Nokia" +__copyright__ = "Copyright (C) 2018-2026, Nokia" __email__ = "grzegorz.latuszek@nokia.com, marcin.usielski@nokia.com" import contextlib import logging from threading import Lock +from moler.connection import Connection +from typing import Callable, List class IOConnection: """External-IO connection.""" - def __init__(self, moler_connection): + def __init__(self, moler_connection: Connection): """ Specific constructor of external-IO should store information how to establish connection (like host/port info) @@ -48,22 +50,22 @@ def __init__(self, moler_connection): :param moler_connection: object of abstract class moler.connection.Connection """ super(IOConnection, self).__init__() - self._connect_subscribers = [] + self._connect_subscribers :List[Callable] = [] self._connect_subscribers_lock = Lock() - self._disconnect_subscribers = [] + self._disconnect_subscribers :List[Callable] = [] self._disconnect_subscribers_lock = Lock() - self.moler_connection = moler_connection - self.__name = "UNNAMED_IO_CONNECTION" + self.moler_connection: Connection = moler_connection + self.__name: str = "UNNAMED_IO_CONNECTION" self.logger = logging.getLogger(f"moler.connection.{self.__name}.io") # plugin the way we output data to external world self.moler_connection.how2send = self.send @property - def name(self): + def name(self) -> str: return self.__name @name.setter - def name(self, value): + def name(self, value: str) -> None: self.__name = value self.logger = logging.getLogger(f"moler.connection.{self.__name}.io") @@ -88,7 +90,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): self.close() return False # reraise exceptions if any - def send(self, data): + def send(self, data: bytes) -> None: """ Send data bytes over external-IO. @@ -96,19 +98,15 @@ def send(self, data): by moler_connection when it needs. """ - def receive(self): - """ - Pull data bytes from external-IO: - - data = io_connection.receive() - - and then forward it to Moler's connection: - - self.moler_connection.data_received(data) - - """ + # def receive(self) -> bytes: + # """ + # Pull data bytes from external-IO: + # data = io_connection.receive() + # and then forward it to Moler's connection: + # self.moler_connection.data_received(data) + # """ - def data_received(self, data, recv_time): + def data_received(self, data: bytes, recv_time: float) -> None: """ Having been given data bytes from external-IO: @@ -116,7 +114,7 @@ def data_received(self, data, recv_time): """ self.moler_connection.data_received(data=data, recv_time=recv_time) - def notify(self, callback, when): + def notify(self, callback: Callable, when: str) -> None: """ Adds subscriber to list of functions to call :param callback: reference to function to call when connection is open/established @@ -128,9 +126,9 @@ def notify(self, callback, when): elif when == "connection_lost": self.subscribe_on_connection_lost(subscriber=callback) - def subscribe_on_connection_made(self, subscriber): + def subscribe_on_connection_made(self, subscriber: Callable) -> None: """ - Adds subscriber to list of functions to call when connection is open/established (also reopen after close) + Adds subscriber to list of functions to call when conne`ction is open/established (also reopen after close) :param subscriber: reference to function to call when connection is open/established :return: None """ @@ -138,7 +136,7 @@ def subscribe_on_connection_made(self, subscriber): self._connect_subscribers_lock, self._connect_subscribers, subscriber ) - def subscribe_on_connection_lost(self, subscriber): + def subscribe_on_connection_lost(self, subscriber: Callable): """ Adds subscriber to list of functions to call when connection is closed/disconnected :param subscriber: reference to function to call when connection is closed/disconnected @@ -148,7 +146,7 @@ def subscribe_on_connection_lost(self, subscriber): self._disconnect_subscribers_lock, self._disconnect_subscribers, subscriber ) - def unsubscribe_on_connection_made(self, subscriber): + def unsubscribe_on_connection_made(self, subscriber: Callable) -> None: """ Remove subscriber from list of functions to call when connection is open/established (also reopen after close) :param subscriber: reference to function registered by method subscribe_on_connection_made @@ -158,7 +156,7 @@ def unsubscribe_on_connection_made(self, subscriber): self._connect_subscribers_lock, self._connect_subscribers, subscriber ) - def unsubscribe_on_connection_lost(self, subscriber): + def unsubscribe_on_connection_lost(self, subscriber: Callable) -> None: """ Remove subscriber from list of functions to call when connection is closed/disconnected :param subscriber: reference to function registered by method subscribe_on_connection_lost @@ -168,45 +166,45 @@ def unsubscribe_on_connection_lost(self, subscriber): self._disconnect_subscribers_lock, self._disconnect_subscribers, subscriber ) - def disable_logging(self): + def disable_logging(self) -> None: """ Disable logging incoming data. :return: None """ self.moler_connection.disable_logging() - def enable_logging(self): + def enable_logging(self) -> None: """ Enable logging incoming data. :return: None """ self.moler_connection.enable_logging() - def _notify(self, lock, subscribers): + def _notify(self, lock: Lock, subscribers: list[Callable]) -> None: with lock: copied_subscribers = subscribers[:] for subscriber in copied_subscribers: subscriber(self) - def _notify_on_connect(self): + def _notify_on_connect(self) -> None: self.logger.info( msg=f"Connection to: '{self.name}' has been opened.", extra={"log_name": self.name}, ) self._notify(self._connect_subscribers_lock, self._connect_subscribers) - def _notify_on_disconnect(self): + def _notify_on_disconnect(self) -> None: self.logger.info( msg=f"Connection to: '{self.name}' has been closed.", extra={"log_name": self.name}, ) self._notify(self._disconnect_subscribers_lock, self._disconnect_subscribers) - def _subscribe(self, lock, subscribers, subscriber): + def _subscribe(self, lock: Lock, subscribers: List[Callable], subscriber: Callable) -> None: with lock: if subscriber not in subscribers: subscribers.append(subscriber) - def _unsubscribe(self, lock, subscribers, subscriber): + def _unsubscribe(self, lock: Lock, subscribers: List[Callable], subscriber: Callable) -> None: with lock: subscribers.remove(subscriber) From e4ccbf1ed799e845d455bfd524b7f67be8608c43 Mon Sep 17 00:00:00 2001 From: Marcin Usielski Date: Thu, 8 Jan 2026 11:16:29 +0100 Subject: [PATCH 02/20] list -> List --- moler/io/io_connection.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/moler/io/io_connection.py b/moler/io/io_connection.py index bcc5c3776..de72faf29 100644 --- a/moler/io/io_connection.py +++ b/moler/io/io_connection.py @@ -50,9 +50,9 @@ def __init__(self, moler_connection: Connection): :param moler_connection: object of abstract class moler.connection.Connection """ super(IOConnection, self).__init__() - self._connect_subscribers :List[Callable] = [] + self._connect_subscribers: List[Callable] = [] self._connect_subscribers_lock = Lock() - self._disconnect_subscribers :List[Callable] = [] + self._disconnect_subscribers: List[Callable] = [] self._disconnect_subscribers_lock = Lock() self.moler_connection: Connection = moler_connection self.__name: str = "UNNAMED_IO_CONNECTION" @@ -180,7 +180,7 @@ def enable_logging(self) -> None: """ self.moler_connection.enable_logging() - def _notify(self, lock: Lock, subscribers: list[Callable]) -> None: + def _notify(self, lock: Lock, subscribers: List[Callable]) -> None: with lock: copied_subscribers = subscribers[:] for subscriber in copied_subscribers: From 7cf68d8251d70c5f17fb1db4ccc2226793a7770c Mon Sep 17 00:00:00 2001 From: Marcin Usielski Date: Thu, 8 Jan 2026 13:29:33 +0100 Subject: [PATCH 03/20] typing --- moler/io/io_connection.py | 7 +++--- moler/io/raw/terminal.py | 45 ++++++++++++++++++++++----------------- 2 files changed, 29 insertions(+), 23 deletions(-) diff --git a/moler/io/io_connection.py b/moler/io/io_connection.py index de72faf29..52028caf9 100644 --- a/moler/io/io_connection.py +++ b/moler/io/io_connection.py @@ -37,6 +37,7 @@ from threading import Lock from moler.connection import Connection from typing import Callable, List +from datetime import datetime class IOConnection: @@ -69,7 +70,7 @@ def name(self, value: str) -> None: self.__name = value self.logger = logging.getLogger(f"moler.connection.{self.__name}.io") - def open(self): + def open(self) -> contextlib.closing: """ Take 'how to establish connection' info from constructor and open that connection. @@ -90,7 +91,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): self.close() return False # reraise exceptions if any - def send(self, data: bytes) -> None: + def send(self, data: str) -> None: """ Send data bytes over external-IO. @@ -106,7 +107,7 @@ def send(self, data: bytes) -> None: # self.moler_connection.data_received(data) # """ - def data_received(self, data: bytes, recv_time: float) -> None: + def data_received(self, data: str, recv_time: datetime) -> None: """ Having been given data bytes from external-IO: diff --git a/moler/io/raw/terminal.py b/moler/io/raw/terminal.py index 8f97d78c2..8f38d63dd 100644 --- a/moler/io/raw/terminal.py +++ b/moler/io/raw/terminal.py @@ -4,6 +4,7 @@ __email__ = "michal.ernst@nokia.com, marcin.usielski@nokia.com, tomasz.krol@nokia.com" import codecs +import contextlib import re import select import datetime @@ -19,6 +20,8 @@ from moler.helpers import all_chars_to_hex from moler.helpers import non_printable_chars_to_hex from moler.util import tracked_thread +from moler.connection import Connection +from typing import Tuple, List class ThreadedTerminal(IOConnection): @@ -30,15 +33,15 @@ class ThreadedTerminal(IOConnection): def __init__( self, - moler_connection, - cmd="/bin/bash", - select_timeout=0.002, - read_buffer_size=4096, - first_prompt=r"[%$#\]]+", - target_prompt=r"moler_bash#", - set_prompt_cmd='unset PROMPT_COMMAND; export PS1="moler_bash# "\n', - dimensions=(100, 300), - terminal_delayafterclose=0.2, + moler_connection: Connection, + cmd: str = "/bin/bash", + select_timeout: float = 0.002, + read_buffer_size: int = 4096, + first_prompt: str = r"[%$#\]]+", + target_prompt: str = r"moler_bash#", + set_prompt_cmd: str = 'unset PROMPT_COMMAND; export PS1="moler_bash# "\n', + dimensions: Tuple[int, int] = (100, 300), + terminal_delayafterclose: float = 0.2, ): """# TODO: # 'export PS1="moler_bash\\$ "\n' would give moler_bash# for root and moler_bash$ for user :param moler_connection: Moler's connection to join with @@ -51,7 +54,7 @@ def __init__( :param dimensions: dimensions of the psuedoterminal :param terminal_delayafterclose: delay for checking if terminal was properly closed """ - super(ThreadedTerminal, self).__init__(moler_connection=moler_connection) + super().__init__(moler_connection=moler_connection) self.debug_hex_on_non_printable_chars = ( False # Set True to log incoming non printable chars as hex. ) @@ -60,7 +63,7 @@ def __init__( self._shell_operable: threading.Event = threading.Event() self._export_sent = False self.pulling_thread = None - self.read_buffer = "" + self.read_buffer: str = "" self._select_timeout = select_timeout self._read_buffer_size = read_buffer_size @@ -74,15 +77,16 @@ def __init__( ) self._terminal_delayafterclose = terminal_delayafterclose - def open(self): + def open(self) -> contextlib.closing: """Open ThreadedTerminal connection & start thread pulling data from it.""" - ret = super(ThreadedTerminal, self).open() + ret = super().open() if not self._terminal: self.moler_connection.open() self._terminal = PtyProcessUnicode.spawn( self._cmd, dimensions=self.dimensions ) + assert self._terminal is not None self._terminal.delayafterclose = self._terminal_delayafterclose # need to not replace not unicode data instead of raise exception self._terminal.decoder = codecs.getincrementaldecoder("utf-8")( @@ -112,12 +116,12 @@ def open(self): return ret - def close(self): + def close(self) -> None: """Close ThreadedTerminal connection & stop pulling thread.""" if self.pulling_thread: self.pulling_thread.join() self.moler_connection.shutdown() - super(ThreadedTerminal, self).close() + super().close() if self._terminal and self._terminal.isalive(): self._notify_on_disconnect() @@ -131,19 +135,20 @@ def close(self): self.pulling_thread = None self.read_buffer = "" - def send(self, data): + def send(self, data: str) -> None: """Write data into ThreadedTerminal connection.""" if self._terminal: self._terminal.write(data) @tracked_thread.log_exit_exception - def pull_data(self, pulling_done): + def pull_data(self, pulling_done: threading.Event) -> None: """Pull data from ThreadedTerminal connection.""" logging.getLogger("moler_threads").debug(f"ENTER {self}") - heartbeat = tracked_thread.report_alive() - reads = [] + heartbeat: bool = tracked_thread.report_alive() + reads: List[int] = [] while not pulling_done.is_set(): + assert self._terminal is not None if next(heartbeat): logging.getLogger("moler_threads").debug(f"ALIVE {self}") try: @@ -174,7 +179,7 @@ def pull_data(self, pulling_done): pulling_done.set() logging.getLogger("moler_threads").debug(f"EXIT {self}") - def _verify_shell_is_operable(self, data): + def _verify_shell_is_operable(self, data: str) -> None: self.read_buffer = self.read_buffer + data lines = self.read_buffer.splitlines() From ab484ec7ff9ea68e9cef57f998ca1c80a45f1ff3 Mon Sep 17 00:00:00 2001 From: Marcin Usielski Date: Thu, 8 Jan 2026 15:45:08 +0100 Subject: [PATCH 04/20] Pythin 3.14 --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 52800f247..dd770b060 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -9,7 +9,7 @@ jobs: strategy: matrix: os: [ubuntu-latest, macos-latest] - python-version: [3.8, 3.9, "3.10", 3.11, 3.12, 3.13] + python-version: [3.8, 3.9, "3.10", 3.11, 3.12, 3.13, 3.14] exclude: - os: macos-latest python-version: 3.8 From 608a93e1b5b9f4f7698595bc3bd4888ecc99dfd8 Mon Sep 17 00:00:00 2001 From: Marcin Usielski Date: Fri, 9 Jan 2026 13:24:14 +0100 Subject: [PATCH 05/20] dev --- moler/config/connections.py | 32 +++ moler/io/io_connection.py | 2 +- moler/io/raw/terminal.py | 2 +- moler/io/raw/terminal_no_forkpty.py | 355 ++++++++++++++++++++++++++++ test/crt/test_unix_no_forkpty.py | 185 +++++++++++++++ 5 files changed, 574 insertions(+), 2 deletions(-) create mode 100644 moler/io/raw/terminal_no_forkpty.py create mode 100644 test/crt/test_unix_no_forkpty.py diff --git a/moler/config/connections.py b/moler/config/connections.py index 37bea0c5f..0cbbb4922 100644 --- a/moler/config/connections.py +++ b/moler/config/connections.py @@ -196,6 +196,7 @@ def tcp_asyncio_in_thrd_conn(port, host='localhost', name=None, **kwargs): # kw def _register_builtin_unix_connections(connection_factory, moler_conn_class): from moler.io.raw.terminal import ThreadedTerminal + from moler.io.raw.terminal_no_forkpty import ThreadedTerminalNoForkPTY def terminal_thd_conn_mt(name=None): # ThreadedTerminal works on unicode so moler_connection must do no encoding @@ -212,6 +213,22 @@ def terminal_thd_conn_st(name=None): io_conn = ThreadedTerminal(moler_connection=mlr_conn) # TODO: add name, logger return io_conn + def terminal_nofork_thd_conn_mt(name=None): + # ThreadedTerminal works on unicode so moler_connection must do no encoding + # mlr_conn = mlr_conn_no_encoding(moler_conn_class, name=name) + mlr_conn = mlr_conn_no_encoding_partial_clean_vt100(moler_conn_class, name=name) + io_conn = ThreadedTerminalNoForkPTY(moler_connection=mlr_conn) # TODO: add name, logger + return io_conn + + def terminal_nofork_thd_conn_st(name=None): + # ThreadedTerminal works on unicode so moler_connection must do no encoding + # mlr_conn = mlr_conn_no_encoding(moler_conn_class, name=name) + from moler.moler_connection_for_single_thread_runner import MolerConnectionForSingleThreadRunner + mlr_conn = mlr_conn_no_encoding_partial_clean_vt100(MolerConnectionForSingleThreadRunner, name=name) + io_conn = ThreadedTerminalNoForkPTY(moler_connection=mlr_conn) # TODO: add name, logger + return io_conn + + # TODO: unify passing logger to io_conn (logger/logger_name) connection_factory.register_construction(io_type="terminal", variant="threaded", @@ -228,6 +245,21 @@ def terminal_thd_conn_st(name=None): constructor=terminal_thd_conn_st) + # TODO: unify passing logger to io_conn (logger/logger_name) + connection_factory.register_construction(io_type="terminal_no_forkpty", + variant="threaded", + constructor=terminal_nofork_thd_conn_mt) # Moler 2.0.0 will replace this to st + + # TODO: unify passing logger to io_conn (logger/logger_name) + connection_factory.register_construction(io_type="terminal_no_forkpty", + variant="multi-threaded", + constructor=terminal_nofork_thd_conn_mt) + + # TODO: unify passing logger to io_conn (logger/logger_name) + connection_factory.register_construction(io_type="terminal_no_forkpty", + variant="single-threaded", + constructor=terminal_nofork_thd_conn_st) + def _register_builtin_py3_unix_connections(connection_factory, moler_conn_class): from moler.io.asyncio.terminal import AsyncioTerminal, AsyncioInThreadTerminal diff --git a/moler/io/io_connection.py b/moler/io/io_connection.py index 52028caf9..0b7e6cc6c 100644 --- a/moler/io/io_connection.py +++ b/moler/io/io_connection.py @@ -129,7 +129,7 @@ def notify(self, callback: Callable, when: str) -> None: def subscribe_on_connection_made(self, subscriber: Callable) -> None: """ - Adds subscriber to list of functions to call when conne`ction is open/established (also reopen after close) + Adds subscriber to list of functions to call when connection is open/established (also reopen after close) :param subscriber: reference to function to call when connection is open/established :return: None """ diff --git a/moler/io/raw/terminal.py b/moler/io/raw/terminal.py index 8f38d63dd..894779e38 100644 --- a/moler/io/raw/terminal.py +++ b/moler/io/raw/terminal.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- __author__ = "Michal Ernst, Marcin Usielski, Tomasz Krol" -__copyright__ = "Copyright (C) 2018-2025, Nokia" +__copyright__ = "Copyright (C) 2018-2026, Nokia" __email__ = "michal.ernst@nokia.com, marcin.usielski@nokia.com, tomasz.krol@nokia.com" import codecs diff --git a/moler/io/raw/terminal_no_forkpty.py b/moler/io/raw/terminal_no_forkpty.py new file mode 100644 index 000000000..ba9a0df64 --- /dev/null +++ b/moler/io/raw/terminal_no_forkpty.py @@ -0,0 +1,355 @@ +# -*- coding: utf-8 -*- +__author__ = "Marcin Usielski" +__copyright__ = "Copyright (C) 2026, Nokia" +__email__ = "marcin.usielski@nokia.com" + +import codecs +import contextlib +import os +import re +import select +import datetime +import logging +import shlex +import subprocess +import threading +import time + + +from moler.io.io_connection import IOConnection +from moler.io.raw import TillDoneThread +from moler.helpers import remove_all_known_special_chars +from moler.helpers import all_chars_to_hex +from moler.helpers import non_printable_chars_to_hex +from moler.util import tracked_thread +from moler.connection import Connection +from typing import Tuple, List, Optional + + +class PtyProcessUnicodeNotFork: + """PtyProcessUnicode without forking process.""" + def __init__(self, cmd: str = "/bin/bash", dimensions: Tuple[int, int]=(25, 120), buffer_size: int=4096): + self.cmd = cmd + self.dimensions = dimensions + self.buffer_size = buffer_size + self.delayafterclose = 0.2 + encoding = "utf-8" + self.decoder = codecs.getincrementaldecoder(encoding)(errors='strict') + self.fd : int = -1 # File descriptor for pty master + self.pid : int = -1 # Process ID of the child process + self.slave_fd : int = -1 # File descriptor for pty slave + self.process : Optional[subprocess.Popen] = None # Subprocess.Popen object + self._closed : bool = True + + def create_pty_process(self): + """Create PtyProcessUnicode without forking process.""" + import pty + import fcntl + + # Create a new pty pair + master_fd, slave_fd = pty.openpty() + print(f"Created pty master_fd={master_fd}, slave_fd={slave_fd}") + + # Set master fd to non-blocking mode + flags = fcntl.fcntl(master_fd, fcntl.F_GETFL) + fcntl.fcntl(master_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) + + # Start the subprocess with the slave fd + process: subprocess.Popen = subprocess.Popen( + shlex.split(self.cmd), + stdin=slave_fd, + stdout=slave_fd, + stderr=slave_fd, + start_new_session=True + ) + print(f"Started subprocess with cmd='{self.cmd}'") + print(f"Subprocess PID: {process.pid}") + + # Store the process information + self.fd = master_fd + self.slave_fd = slave_fd + self.pid = process.pid + self.process = process + self._closed = False + + # Close slave fd in parent process (child process still has it) + # We keep it open initially to prevent early closure + # os.close(slave_fd) # Commented out to keep it open + + # Give the shell time to initialize + time.sleep(0.1) + + def write(self, data: str): + """Write data to pty process.""" + if self._closed or self.fd < 0: + raise IOError("Cannot write to closed pty process") + + try: + # Convert string to bytes if necessary + if isinstance(data, str): + data_bytes = data.encode('utf-8') + else: + data_bytes = data + + # Write data to the pty master + written = os.write(self.fd, data_bytes) + return written + except OSError as e: + if e.errno == 5: # Input/output error - process might be dead + self._closed = True + raise + + def read(self, size: int) -> str: + """Read data from pty process.""" + if self._closed or self.fd < 0: + raise EOFError("Cannot read from closed pty process") + + try: + # Read raw bytes from pty master (non-blocking) + data_bytes = os.read(self.fd, size) + + if not data_bytes: + raise EOFError("End of file reached") + + # Decode bytes to string using the incremental decoder + # This handles partial UTF-8 sequences correctly + data_str = self.decoder.decode(data_bytes, final=False) + return data_str + + except OSError as e: + if e.errno == 11: # Resource temporarily unavailable (EAGAIN) + return "" # No data available, return empty string + elif e.errno == 5: # Input/output error - process might be dead + self._closed = True + raise EOFError("PTY process terminated") + else: + raise + + def close(self, force: bool = False) -> None: + """Close pty process.""" + if self._closed: + return + + import signal + import subprocess + + self._closed = True + + # Try to terminate the process gracefully first + if self.process and self.isalive(): + try: + if force: + self.process.kill() # SIGKILL + else: + self.process.terminate() # SIGTERM + + # Wait for process to end with timeout + try: + self.process.wait(timeout=self.delayafterclose) + except subprocess.TimeoutExpired: + if not force: + # If still alive and not forcing, try kill + self.process.kill() + self.process.wait(timeout=0.5) + except Exception as e: + print(f"Error terminating process: {e}") + + # Close file descriptors + if self.fd >= 0: + try: + os.close(self.fd) + except OSError: + pass + self.fd = -1 + + if self.slave_fd >= 0: + try: + os.close(self.slave_fd) + except OSError: + pass + self.slave_fd = -1 + + def isalive(self) -> bool: + """Check if pty process is alive.""" + if self._closed or not self.process: + return False + + # Check if process is still running + poll_result = self.process.poll() + return poll_result is None # None means process is still running + + +class ThreadedTerminalNoForkPTY(IOConnection): + """ + Works on Unix (like Linux) systems only! + + ThreadedTerminalNoForkPTY is shell working under Pty + """ + + def __init__( + self, + moler_connection: Connection, + cmd: str = "/bin/bash", + select_timeout: float = 0.002, + read_buffer_size: int = 4096, + first_prompt: str = r"[%$#\]]+", + target_prompt: str = r"moler_bash#", + set_prompt_cmd: str = 'unset PROMPT_COMMAND; export PS1="moler_bash# "\n', + dimensions: Tuple[int, int] = (100, 300), + terminal_delayafterclose: float = 0.2, + ): + """# TODO: # 'export PS1="moler_bash\\$ "\n' would give moler_bash# for root and moler_bash$ for user + :param moler_connection: Moler's connection to join with + :param cmd: command to run terminal + :param select_timeout: timeout for reading data from terminal + :param read_buffer_size: buffer for reading data from terminal + :param first_prompt: default terminal prompt on host where Moler is starting + :param target_prompt: new prompt which will be set on terminal + :param set_prompt_cmd: command to change prompt with new line char on the end of string + :param dimensions: dimensions of the psuedoterminal + :param terminal_delayafterclose: delay for checking if terminal was properly closed + """ + super().__init__(moler_connection=moler_connection) + self.debug_hex_on_non_printable_chars = ( + False # Set True to log incoming non printable chars as hex. + ) + self.debug_hex_on_all_chars = False # Set True to log incoming data as hex. + self._terminal: Optional[PtyProcessUnicodeNotFork] = None + self._shell_operable: threading.Event = threading.Event() + self._export_sent = False + self.pulling_thread: Optional[threading.Thread] = None + self.read_buffer: str = "" + + self._select_timeout = select_timeout + self._read_buffer_size = read_buffer_size + self.dimensions = dimensions + self.first_prompt = first_prompt + self.target_prompt = target_prompt + self._cmd = [cmd] + self.set_prompt_cmd = set_prompt_cmd + self._re_set_prompt_cmd = re.sub( + "['\"].*['\"]", "", self.set_prompt_cmd.strip() + ) + self._terminal_delayafterclose = terminal_delayafterclose + + def open(self) -> contextlib.closing: + """Open ThreadedTerminal connection & start thread pulling data from it.""" + ret = super().open() + + if not self._terminal: + self.moler_connection.open() + # self._terminal = PtyProcessUnicode.spawn( + # self._cmd, dimensions=self.dimensions + # ) + self._terminal = PtyProcessUnicodeNotFork() + assert self._terminal is not None + self._terminal.create_pty_process() + self._terminal.delayafterclose = self._terminal_delayafterclose + # need to not replace not unicode data instead of raise exception + self._terminal.decoder = codecs.getincrementaldecoder("utf-8")( + errors="replace" + ) + + done = threading.Event() + self.pulling_thread = TillDoneThread( + target=self.pull_data, done_event=done, kwargs={"pulling_done": done} + ) + self.pulling_thread.start() + retry = 0 + is_operable = False + + timeout = 4 * 60 + start_time = time.monotonic() + + while (time.monotonic() - start_time <= timeout) and (not is_operable): + is_operable = self._shell_operable.wait(timeout=1) + if not is_operable: + buff = self.read_buffer.encode("UTF-8", "replace") + self.logger.warning( + f"Terminal open but not fully operable yet. Try {retry} after {time.monotonic() - start_time:.2f} s\nREAD_BUFFER: '{buff}'" + ) + self._terminal.write("\n") + retry += 1 + + return ret + + def close(self) -> None: + """Close ThreadedTerminal connection & stop pulling thread.""" + if self.pulling_thread: + self.pulling_thread.join() + self.moler_connection.shutdown() + super().close() + + if self._terminal and self._terminal.isalive(): + self._notify_on_disconnect() + try: + self._terminal.close(force=True) + except Exception as ex: + self.logger.warning(f"Exception while closing terminal: {ex}") + self._terminal = None + self._shell_operable.clear() + self._export_sent = False + self.pulling_thread = None + self.read_buffer = "" + + def send(self, data: str) -> None: + """Write data into ThreadedTerminal connection.""" + if self._terminal: + self._terminal.write(data) + + @tracked_thread.log_exit_exception + def pull_data(self, pulling_done: threading.Event) -> None: + """Pull data from ThreadedTerminal connection.""" + logging.getLogger("moler_threads").debug(f"ENTER {self}") + heartbeat = tracked_thread.report_alive() + reads: List[int] = [] + + while not pulling_done.is_set(): + assert self._terminal is not None + if next(heartbeat): + logging.getLogger("moler_threads").debug(f"ALIVE {self}") + try: + reads, _, _ = select.select( + [self._terminal.fd], [], [], self._select_timeout + ) + except ValueError as exc: + self.logger.warning(f"'{exc.__class__}: {exc}'") + self._notify_on_disconnect() + pulling_done.set() + + if self._terminal.fd in reads: + try: + data = self._terminal.read(self._read_buffer_size) + if self.debug_hex_on_all_chars: + self.logger.debug(f"incoming data: '{all_chars_to_hex(data)}'.") + if self.debug_hex_on_non_printable_chars: + self.logger.debug( + f"incoming data: '{non_printable_chars_to_hex(data)}'." + ) + + if self._shell_operable.is_set(): + self.data_received(data=data, recv_time=datetime.datetime.now()) + else: + self._verify_shell_is_operable(data) + except EOFError: + self._notify_on_disconnect() + pulling_done.set() + logging.getLogger("moler_threads").debug(f"EXIT {self}") + + def _verify_shell_is_operable(self, data: str) -> None: + self.read_buffer = self.read_buffer + data + lines = self.read_buffer.splitlines() + + for line in lines: + line = remove_all_known_special_chars(line) + if not re.search(self._re_set_prompt_cmd, line) and re.search( + self.target_prompt, line + ): + self._notify_on_connect() + self._shell_operable.set() + self.data_received(data=self.read_buffer, recv_time=datetime.datetime.now()) + elif not self._export_sent and re.search( + self.first_prompt, self.read_buffer, re.MULTILINE + ): + self.send(self.set_prompt_cmd) + self._export_sent = True diff --git a/test/crt/test_unix_no_forkpty.py b/test/crt/test_unix_no_forkpty.py new file mode 100644 index 000000000..45d26126f --- /dev/null +++ b/test/crt/test_unix_no_forkpty.py @@ -0,0 +1,185 @@ +# -*- coding: utf-8 -*- + +__author__ = 'Marcin Usielski' +__copyright__ = 'Copyright (C) 2024, Nokia' +__email__ = 'marcin.usielski@nokia.com' + +import pytest +import tempfile +import os +import six +import getpass +import platform +from moler.device.unixlocal import UnixLocal +from moler.event_awaiter import EventAwaiter + + +@pytest.mark.skipif('Darwin' == platform.system(), reason="No ip on Macos on Github") +def test_many_commands_wait(unix_terminal): + unix = unix_terminal + cmd_ip_link = unix.get_cmd(cmd_name="ip_link", cmd_params={'action': 'show'}) + cmd_ip_addr = unix.get_cmd(cmd_name="ip_link", cmd_params={'action': 'show'}) + cmd_env = unix.get_cmd(cmd_name="env") + cmd_ip_link.start() + cmd_ip_addr.start() + cmd_env.start() + assert True is EventAwaiter.wait_for_all(timeout=10, events=[cmd_ip_link, cmd_ip_addr, cmd_env]) + env_ret = cmd_env.result() + assert cmd_ip_addr.result() is not None + assert cmd_ip_link.result() is not None + assert "PWD" in env_ret + + +def test_many_commands_wait_ping(unix_terminal): + unix = unix_terminal + count_ping_1 = 1 + count_ping_2 = 2 + cmd_ping_1 = unix.get_cmd(cmd_name="ping", cmd_params={'options': f"-c {count_ping_1}", 'destination': '127.0.0.1'}) + cmd_ping_2 = unix.get_cmd(cmd_name="ping", cmd_params={'options': f"-c {count_ping_2}", 'destination': '127.0.0.1'}) + cmd_whoami = unix.get_cmd(cmd_name="whoami") + cmd_ping_1.start() + cmd_ping_2.start() + cmd_whoami.start() + assert True is EventAwaiter.wait_for_all(timeout=10, events=[cmd_ping_1, cmd_ping_2, cmd_whoami]) + ret_whoami = cmd_whoami.result() + assert getpass.getuser() == ret_whoami['USER'] + assert cmd_ping_1.result() is not None + assert cmd_ping_1.result()['packets_transmitted'] == count_ping_1 + assert cmd_ping_2.result() is not None + assert cmd_ping_2.result()['packets_transmitted'] == count_ping_2 + + +@pytest.mark.skipif('Darwin' == platform.system(), reason="No ip on Macos on Github") +def test_ip_link(unix_terminal): + unix = unix_terminal + cmd_ip = unix.get_cmd(cmd_name="ip_link", cmd_params={'action': 'show'}) + ret = cmd_ip() + assert ret is not None + + +@pytest.mark.skipif('Darwin' == platform.system(), reason="No ip on Macos on Github") +def test_ip_neigh(unix_terminal): + unix = unix_terminal + cmd_ip = unix.get_cmd(cmd_name="ip_neigh", cmd_params={'options': 'show'}) + ret = cmd_ip() + assert ret is not None + + +def test_enter(unix_terminal): + unix = unix_terminal + cmd_enter = unix.get_cmd(cmd_name="enter") + cmd_enter() + + +def test_env(unix_terminal): + unix = unix_terminal + cmd_env = unix.get_cmd(cmd_name="env") + ret = cmd_env() + assert "PWD" in ret + + +def test_echo(unix_terminal): + unix = unix_terminal + text = "simple" + cmd_echo = unix.get_cmd(cmd_name="echo", cmd_params={'text': text}) + ret = cmd_echo() + assert text in ret['RESULT'] + + +def test_dmesg(unix_terminal): + unix = unix_terminal + cmd_dmesg = unix.get_cmd(cmd_name="dmesg") + ret = cmd_dmesg() + assert 'LINES' in ret + + +def test_date(unix_terminal): + unix = unix_terminal + cmd_date = unix.get_cmd(cmd_name="date") + ret = cmd_date() + assert 'ZONE' in ret + assert 'TIME' in ret + + +def test_uname(unix_terminal): + unix = unix_terminal + cmd_uname = unix.get_cmd(cmd_name="uname", cmd_params={"options": "-a"}) + ret = cmd_uname() + found = False + system_name = platform.system() + for line in ret["RESULT"]: + if system_name in line: + found = True + assert found + + +def test_whoami(unix_terminal): + unix = unix_terminal + cmd_whoami = unix.get_cmd(cmd_name="whoami") + ret = cmd_whoami() + assert getpass.getuser() == ret['USER'] + + +def test_7z(unix_terminal): + unix = unix_terminal + cmd_7z = unix.get_cmd(cmd_name="7z", cmd_params={"options": "a", "archive_file": "arch.7z", "files": ["a", "b"],}) + assert cmd_7z is not None + + +@pytest.mark.skipif('Darwin' == platform.system(), reason="No md5sum on Macos on Github") +def test_cp_md5sum_cat_mv_rm_ls(unix_terminal): + unix = unix_terminal + f = tempfile.NamedTemporaryFile(delete=False) + file_content = "content" + md5sum = "f75b8179e4bbe7e2b4a074dcef62de95" + data = six.b(file_content + "\n") + f.write(data) + src = f.name + f.close() + tmp_dir = os.path.dirname(src) + src_file = os.path.basename(src) + dst_file = "dst.moler.file" + dst = os.path.join(tmp_dir, dst_file) + + cmd_cat = unix.get_cmd(cmd_name="cat", cmd_params={"path": src}) + ret = cmd_cat() + assert file_content in ret['LINES'] + + cmd_md5sum = unix.get_cmd(cmd_name="md5sum", cmd_params={"path": src}) + ret = cmd_md5sum() + assert ret["SUM"] == md5sum + + cmd_cp = unix.get_cmd(cmd_name="cp", cmd_params={"src": src, "dst": dst}) + cmd_cp() + cmd_ls = unix.get_cmd(cmd_name="ls", cmd_params={"options": f"-l {tmp_dir}"}) + ret = cmd_ls() + assert src_file in ret['files'] + assert dst_file in ret['files'] + + cmd_rm = unix.get_cmd(cmd_name="rm", cmd_params={"file": dst}) + cmd_rm() + cmd_ls = unix.get_cmd(cmd_name="ls", cmd_params={"options": '-l ' + tmp_dir}) + ret = cmd_ls() + assert src_file in ret['files'] + assert dst_file not in ret['files'] + + cmd_mv = unix.get_cmd(cmd_name="mv", cmd_params={"src": src, "dst": dst}) + cmd_mv() + cmd_ls = unix.get_cmd(cmd_name="ls", cmd_params={"options": '-l ' + tmp_dir}) + ret = cmd_ls() + assert src_file not in ret['files'] + assert dst_file in ret['files'] + + cmd_rm = unix.get_cmd(cmd_name="rm", cmd_params={"file": dst}) + cmd_rm() + cmd_ls = unix.get_cmd(cmd_name="ls", cmd_params={"options": '-l ' + tmp_dir}) + ret = cmd_ls() + assert src_file not in ret['files'] + assert dst_file not in ret['files'] + + +@pytest.fixture +def unix_terminal(): + unix = UnixLocal(io_type='terminal_no_forkpty', variant='threaded') + unix.establish_connection() + yield unix From fb4cafc0441dcfd712c3e937616c57212012aee7 Mon Sep 17 00:00:00 2001 From: Marcin Usielski Date: Fri, 9 Jan 2026 13:39:16 +0100 Subject: [PATCH 06/20] peppa --- moler/config/connections.py | 3 +-- moler/io/raw/terminal_no_forkpty.py | 12 ++++++------ 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/moler/config/connections.py b/moler/config/connections.py index 0cbbb4922..46052daf1 100644 --- a/moler/config/connections.py +++ b/moler/config/connections.py @@ -228,7 +228,6 @@ def terminal_nofork_thd_conn_st(name=None): io_conn = ThreadedTerminalNoForkPTY(moler_connection=mlr_conn) # TODO: add name, logger return io_conn - # TODO: unify passing logger to io_conn (logger/logger_name) connection_factory.register_construction(io_type="terminal", variant="threaded", @@ -244,7 +243,6 @@ def terminal_nofork_thd_conn_st(name=None): variant="single-threaded", constructor=terminal_thd_conn_st) - # TODO: unify passing logger to io_conn (logger/logger_name) connection_factory.register_construction(io_type="terminal_no_forkpty", variant="threaded", @@ -260,6 +258,7 @@ def terminal_nofork_thd_conn_st(name=None): variant="single-threaded", constructor=terminal_nofork_thd_conn_st) + def _register_builtin_py3_unix_connections(connection_factory, moler_conn_class): from moler.io.asyncio.terminal import AsyncioTerminal, AsyncioInThreadTerminal diff --git a/moler/io/raw/terminal_no_forkpty.py b/moler/io/raw/terminal_no_forkpty.py index ba9a0df64..9bfad0ec9 100644 --- a/moler/io/raw/terminal_no_forkpty.py +++ b/moler/io/raw/terminal_no_forkpty.py @@ -28,18 +28,18 @@ class PtyProcessUnicodeNotFork: """PtyProcessUnicode without forking process.""" - def __init__(self, cmd: str = "/bin/bash", dimensions: Tuple[int, int]=(25, 120), buffer_size: int=4096): + def __init__(self, cmd: str = "/bin/bash", dimensions: Tuple[int, int] =( 25, 120), buffer_size: int = 4096): self.cmd = cmd self.dimensions = dimensions self.buffer_size = buffer_size self.delayafterclose = 0.2 encoding = "utf-8" self.decoder = codecs.getincrementaldecoder(encoding)(errors='strict') - self.fd : int = -1 # File descriptor for pty master - self.pid : int = -1 # Process ID of the child process - self.slave_fd : int = -1 # File descriptor for pty slave - self.process : Optional[subprocess.Popen] = None # Subprocess.Popen object - self._closed : bool = True + self.fd: int = -1 # File descriptor for pty master + self.pid: int = -1 # Process ID of the child process + self.slave_fd: int = -1 # File descriptor for pty slave + self.process: Optional[subprocess.Popen] = None # Subprocess.Popen object + self._closed: bool = True def create_pty_process(self): """Create PtyProcessUnicode without forking process.""" From 4a13d362dcccefaebb20eaf5544857d79960a4d5 Mon Sep 17 00:00:00 2001 From: Marcin Usielski Date: Fri, 9 Jan 2026 13:40:55 +0100 Subject: [PATCH 07/20] pep2 --- moler/io/raw/terminal_no_forkpty.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/moler/io/raw/terminal_no_forkpty.py b/moler/io/raw/terminal_no_forkpty.py index 9bfad0ec9..769837726 100644 --- a/moler/io/raw/terminal_no_forkpty.py +++ b/moler/io/raw/terminal_no_forkpty.py @@ -28,7 +28,7 @@ class PtyProcessUnicodeNotFork: """PtyProcessUnicode without forking process.""" - def __init__(self, cmd: str = "/bin/bash", dimensions: Tuple[int, int] =( 25, 120), buffer_size: int = 4096): + def __init__(self, cmd: str = "/bin/bash", dimensions: Tuple[int, int] = (25, 120), buffer_size: int = 4096): self.cmd = cmd self.dimensions = dimensions self.buffer_size = buffer_size From ed1a6392b558390acb4d5cec4951b4438c953856 Mon Sep 17 00:00:00 2001 From: Marcin Usielski Date: Fri, 9 Jan 2026 15:14:11 +0100 Subject: [PATCH 08/20] tests --- test/integration/test_io_raw_terminal.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/test/integration/test_io_raw_terminal.py b/test/integration/test_io_raw_terminal.py index e1a434284..295f014eb 100644 --- a/test/integration/test_io_raw_terminal.py +++ b/test/integration/test_io_raw_terminal.py @@ -4,11 +4,11 @@ """ __author__ = 'Marcin Usielski, Michal Ernst' -__copyright__ = 'Copyright (C) 2018-2019, Nokia' +__copyright__ = 'Copyright (C) 2018-2026, Nokia' __email__ = 'marcin.usielski@nokia.com, michal.ernst@nokia.com' import getpass - +import sys import pytest from moler.cmd.unix.ls import Ls @@ -17,6 +17,7 @@ from moler.cmd.unix.lsof import Lsof from moler.exceptions import CommandTimeout from moler.io.raw.terminal import ThreadedTerminal +from moler.io.raw.terminal_no_forkpty import ThreadedTerminalNoForkPTY def test_terminal_cmd_whoami_during_ping(terminal_connection): @@ -88,7 +89,10 @@ def terminal_connection(): from moler.threaded_moler_connection import ThreadedMolerConnection moler_conn = ThreadedMolerConnection() - terminal = ThreadedTerminal(moler_connection=moler_conn) + if sys.version_info >= (3, 11): + terminal = ThreadedTerminalNoForkPTY(moler_connection=moler_conn) + else: + terminal = ThreadedTerminal(moler_connection=moler_conn) with terminal.open() as connection: yield connection.moler_connection From 14a5a0c578b0642d407f4e99cc56fa55ff12fcd0 Mon Sep 17 00:00:00 2001 From: Marcin Usielski Date: Fri, 9 Jan 2026 15:18:34 +0100 Subject: [PATCH 09/20] test with both terminals --- test/integration/test_io_raw_terminal.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/test/integration/test_io_raw_terminal.py b/test/integration/test_io_raw_terminal.py index 295f014eb..26cf73d60 100644 --- a/test/integration/test_io_raw_terminal.py +++ b/test/integration/test_io_raw_terminal.py @@ -84,15 +84,13 @@ def test_terminal_lsof(terminal_connection): assert ret["NUMBER"] > 1 -@pytest.fixture() -def terminal_connection(): +@pytest.fixture(params=[ThreadedTerminal, ThreadedTerminalNoForkPTY]) +def terminal_connection(request): from moler.threaded_moler_connection import ThreadedMolerConnection + terminal_class = request.param moler_conn = ThreadedMolerConnection() - if sys.version_info >= (3, 11): - terminal = ThreadedTerminalNoForkPTY(moler_connection=moler_conn) - else: - terminal = ThreadedTerminal(moler_connection=moler_conn) + terminal = terminal_class(moler_connection=moler_conn) with terminal.open() as connection: yield connection.moler_connection From 4b58c18ff52119adad140da21cb897498b339ebf Mon Sep 17 00:00:00 2001 From: Marcin Usielski Date: Mon, 12 Jan 2026 09:09:42 +0100 Subject: [PATCH 10/20] new test --- test/crt/test_unix_no_forkpty.py | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/test/crt/test_unix_no_forkpty.py b/test/crt/test_unix_no_forkpty.py index 45d26126f..443bd0716 100644 --- a/test/crt/test_unix_no_forkpty.py +++ b/test/crt/test_unix_no_forkpty.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- __author__ = 'Marcin Usielski' -__copyright__ = 'Copyright (C) 2024, Nokia' +__copyright__ = 'Copyright (C) 2026, Nokia' __email__ = 'marcin.usielski@nokia.com' import pytest @@ -178,8 +178,33 @@ def test_cp_md5sum_cat_mv_rm_ls(unix_terminal): assert dst_file not in ret['files'] +def test_ls_from_two_terminals(unix_terminal, unix_terminal2): + unix1 = unix_terminal + unix2 = unix_terminal2 + + cmd_ping_1 = unix1.get_cmd(cmd_name="ping", cmd_params={'options': f"-c 2", 'destination': '127.0.0.1'}) + cmd_ping_2 = unix2.get_cmd(cmd_name="ping", cmd_params={'options': f"-c 3", 'destination': 'localhost'}) + cmd_ping_1.start() + cmd_ping_2.start() + ret1 = cmd_ping_1.await_done(timeout=10) + ret2 =cmd_ping_2.await_done(timeout=10) + assert 'packets_transmitted' in ret1 + assert ret1['packets_transmitted'] == 2 + assert 'packets_transmitted' in ret2 + assert ret2['packets_transmitted'] == 3 + + + + @pytest.fixture def unix_terminal(): unix = UnixLocal(io_type='terminal_no_forkpty', variant='threaded') unix.establish_connection() yield unix + + +@pytest.fixture +def unix_terminal2(): + unix = UnixLocal(io_type='terminal_no_forkpty', variant='threaded') + unix.establish_connection() + yield unix From da51c05d896bff7fcae8e464c3cf43bda7680564 Mon Sep 17 00:00:00 2001 From: Marcin Usielski Date: Mon, 12 Jan 2026 11:20:02 +0100 Subject: [PATCH 11/20] deb --- moler/io/io_connection.py | 15 ++++++----- moler/io/raw/terminal_no_forkpty.py | 41 ++++++++++------------------- test/crt/test_unix_no_forkpty.py | 11 ++++---- 3 files changed, 28 insertions(+), 39 deletions(-) diff --git a/moler/io/io_connection.py b/moler/io/io_connection.py index 0b7e6cc6c..f30e49765 100644 --- a/moler/io/io_connection.py +++ b/moler/io/io_connection.py @@ -99,13 +99,14 @@ def send(self, data: str) -> None: by moler_connection when it needs. """ - # def receive(self) -> bytes: - # """ - # Pull data bytes from external-IO: - # data = io_connection.receive() - # and then forward it to Moler's connection: - # self.moler_connection.data_received(data) - # """ + def receive(self) -> bytes: + """ + Pull data bytes from external-IO: + data = io_connection.receive() + and then forward it to Moler's connection: + self.moler_connection.data_received(data) + """ + return b"" def data_received(self, data: str, recv_time: datetime) -> None: """ diff --git a/moler/io/raw/terminal_no_forkpty.py b/moler/io/raw/terminal_no_forkpty.py index 769837726..dbf7a0580 100644 --- a/moler/io/raw/terminal_no_forkpty.py +++ b/moler/io/raw/terminal_no_forkpty.py @@ -5,11 +5,13 @@ import codecs import contextlib +import fcntl import os import re import select import datetime import logging +import pty import shlex import subprocess import threading @@ -25,16 +27,17 @@ from moler.connection import Connection from typing import Tuple, List, Optional +# Unix only. Does not work on Windows. class PtyProcessUnicodeNotFork: """PtyProcessUnicode without forking process.""" def __init__(self, cmd: str = "/bin/bash", dimensions: Tuple[int, int] = (25, 120), buffer_size: int = 4096): - self.cmd = cmd - self.dimensions = dimensions - self.buffer_size = buffer_size - self.delayafterclose = 0.2 - encoding = "utf-8" - self.decoder = codecs.getincrementaldecoder(encoding)(errors='strict') + self.cmd: str = cmd + self.dimensions: Tuple[int, int] = dimensions + self.buffer_size: int = buffer_size + self.delayafterclose: float = 0.2 + self.encoding = "utf-8" + self.decoder = codecs.getincrementaldecoder(self.encoding)(errors='strict') self.fd: int = -1 # File descriptor for pty master self.pid: int = -1 # Process ID of the child process self.slave_fd: int = -1 # File descriptor for pty slave @@ -43,27 +46,22 @@ def __init__(self, cmd: str = "/bin/bash", dimensions: Tuple[int, int] = (25, 12 def create_pty_process(self): """Create PtyProcessUnicode without forking process.""" - import pty - import fcntl # Create a new pty pair master_fd, slave_fd = pty.openpty() - print(f"Created pty master_fd={master_fd}, slave_fd={slave_fd}") # Set master fd to non-blocking mode flags = fcntl.fcntl(master_fd, fcntl.F_GETFL) fcntl.fcntl(master_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) # Start the subprocess with the slave fd - process: subprocess.Popen = subprocess.Popen( + process = subprocess.Popen( shlex.split(self.cmd), stdin=slave_fd, stdout=slave_fd, stderr=slave_fd, start_new_session=True ) - print(f"Started subprocess with cmd='{self.cmd}'") - print(f"Subprocess PID: {process.pid}") # Store the process information self.fd = master_fd @@ -72,11 +70,6 @@ def create_pty_process(self): self.process = process self._closed = False - # Close slave fd in parent process (child process still has it) - # We keep it open initially to prevent early closure - # os.close(slave_fd) # Commented out to keep it open - - # Give the shell time to initialize time.sleep(0.1) def write(self, data: str): @@ -87,7 +80,7 @@ def write(self, data: str): try: # Convert string to bytes if necessary if isinstance(data, str): - data_bytes = data.encode('utf-8') + data_bytes = data.encode(self.encoding) else: data_bytes = data @@ -129,10 +122,6 @@ def close(self, force: bool = False) -> None: """Close pty process.""" if self._closed: return - - import signal - import subprocess - self._closed = True # Try to terminate the process gracefully first @@ -225,7 +214,7 @@ def __init__( self.dimensions = dimensions self.first_prompt = first_prompt self.target_prompt = target_prompt - self._cmd = [cmd] + self._cmd = cmd self.set_prompt_cmd = set_prompt_cmd self._re_set_prompt_cmd = re.sub( "['\"].*['\"]", "", self.set_prompt_cmd.strip() @@ -238,10 +227,8 @@ def open(self) -> contextlib.closing: if not self._terminal: self.moler_connection.open() - # self._terminal = PtyProcessUnicode.spawn( - # self._cmd, dimensions=self.dimensions - # ) - self._terminal = PtyProcessUnicodeNotFork() + self._terminal = PtyProcessUnicodeNotFork(cmd=self._cmd, dimensions=self.dimensions, + buffer_size=self._read_buffer_size) assert self._terminal is not None self._terminal.create_pty_process() self._terminal.delayafterclose = self._terminal_delayafterclose diff --git a/test/crt/test_unix_no_forkpty.py b/test/crt/test_unix_no_forkpty.py index 443bd0716..32e5fde1b 100644 --- a/test/crt/test_unix_no_forkpty.py +++ b/test/crt/test_unix_no_forkpty.py @@ -4,6 +4,7 @@ __copyright__ = 'Copyright (C) 2026, Nokia' __email__ = 'marcin.usielski@nokia.com' +import time import pytest import tempfile import os @@ -178,14 +179,16 @@ def test_cp_md5sum_cat_mv_rm_ls(unix_terminal): assert dst_file not in ret['files'] -def test_ls_from_two_terminals(unix_terminal, unix_terminal2): +def test_ping_from_two_terminals(unix_terminal, unix_terminal2): unix1 = unix_terminal unix2 = unix_terminal2 - cmd_ping_1 = unix1.get_cmd(cmd_name="ping", cmd_params={'options': f"-c 2", 'destination': '127.0.0.1'}) - cmd_ping_2 = unix2.get_cmd(cmd_name="ping", cmd_params={'options': f"-c 3", 'destination': 'localhost'}) + cmd_ping_1 = unix1.get_cmd(cmd_name="ping", cmd_params={'options': f"-c 2 -i 0.2", 'destination': '127.0.0.1'}) + cmd_ping_2 = unix2.get_cmd(cmd_name="ping", cmd_params={'options': f"-c 3 -i 0.1", 'destination': 'localhost'}) cmd_ping_1.start() cmd_ping_2.start() + assert cmd_ping_1.running() + assert cmd_ping_2.running() ret1 = cmd_ping_1.await_done(timeout=10) ret2 =cmd_ping_2.await_done(timeout=10) assert 'packets_transmitted' in ret1 @@ -194,8 +197,6 @@ def test_ls_from_two_terminals(unix_terminal, unix_terminal2): assert ret2['packets_transmitted'] == 3 - - @pytest.fixture def unix_terminal(): unix = UnixLocal(io_type='terminal_no_forkpty', variant='threaded') From 0780acfe3c058c6cb4821c5949eeb80135f2ed21 Mon Sep 17 00:00:00 2001 From: Marcin Usielski Date: Mon, 12 Jan 2026 11:21:30 +0100 Subject: [PATCH 12/20] pep --- moler/io/raw/terminal_no_forkpty.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/moler/io/raw/terminal_no_forkpty.py b/moler/io/raw/terminal_no_forkpty.py index dbf7a0580..749a25d46 100644 --- a/moler/io/raw/terminal_no_forkpty.py +++ b/moler/io/raw/terminal_no_forkpty.py @@ -27,8 +27,10 @@ from moler.connection import Connection from typing import Tuple, List, Optional + # Unix only. Does not work on Windows. + class PtyProcessUnicodeNotFork: """PtyProcessUnicode without forking process.""" def __init__(self, cmd: str = "/bin/bash", dimensions: Tuple[int, int] = (25, 120), buffer_size: int = 4096): From 5ebd528720c92bfcc021c50e1e86481de0dcc26c Mon Sep 17 00:00:00 2001 From: Marcin Usielski Date: Mon, 12 Jan 2026 14:53:09 +0100 Subject: [PATCH 13/20] dev --- moler/io/raw/pty_process_unicode.py | 178 ++++++++++++++++++++++++++ moler/io/raw/terminal_no_forkpty.py | 190 +++++----------------------- 2 files changed, 210 insertions(+), 158 deletions(-) create mode 100644 moler/io/raw/pty_process_unicode.py diff --git a/moler/io/raw/pty_process_unicode.py b/moler/io/raw/pty_process_unicode.py new file mode 100644 index 000000000..ad133c690 --- /dev/null +++ b/moler/io/raw/pty_process_unicode.py @@ -0,0 +1,178 @@ +# -*- coding: utf-8 -*- +__author__ = "Marcin Usielski" +__copyright__ = "Copyright (C) 2026, Nokia" +__email__ = "marcin.usielski@nokia.com" + +import codecs +import fcntl +import os +import pty +import shlex +import subprocess +import time + +from typing import Tuple, Optional + + +# Unix only. Does not work on Windows. + + +class PtyProcessUnicodeNotFork: + """PtyProcessUnicode without forking process.""" + def __init__(self, cmd: str = "/bin/bash", dimensions: Tuple[int, int] = (25, 120), buffer_size: int = 4096): + """ + Initialize PtyProcessUnicodeNotFork. + :param cmd: command to run in pty process + :param dimensions: dimensions of the pty (rows, cols) + :param buffer_size: buffer size for reading data + """ + self.cmd: str = cmd + self.dimensions: Tuple[int, int] = dimensions + self.buffer_size: int = buffer_size + self.delayafterclose: float = 0.2 + self.encoding = "utf-8" + self.decoder = codecs.getincrementaldecoder(self.encoding)(errors='strict') + self.fd: int = -1 # File descriptor for pty master + self.pid: int = -1 # Process ID of the child process + self.slave_fd: int = -1 # File descriptor for pty slave + self.process: Optional[subprocess.Popen] = None # Subprocess.Popen object + self._closed: bool = True + + def create_pty_process(self) -> None: + """Create PtyProcessUnicode without forking process.""" + + # Create a new pty pair + master_fd, slave_fd = pty.openpty() + + # Set master fd to non-blocking mode + flags = fcntl.fcntl(master_fd, fcntl.F_GETFL) + fcntl.fcntl(master_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) + + # Start the subprocess with the slave fd + process = subprocess.Popen( + shlex.split(self.cmd), + stdin=slave_fd, + stdout=slave_fd, + stderr=slave_fd, + start_new_session=True + ) + + # Store the process information + self.fd = master_fd + self.slave_fd = slave_fd + self.pid = process.pid + self.process = process + self._closed = False + + time.sleep(0.1) + + def write(self, data: str|bytes) -> int: + """ + Write data to pty process. + :param data: data to write + :return: number of bytes written + """ + if self._closed or self.fd < 0: + raise IOError("Cannot write to closed pty process") + + try: + # Convert string to bytes if necessary + if isinstance(data, str): + data_bytes = data.encode(self.encoding) + else: + data_bytes = data + + # Write data to the pty master + written = os.write(self.fd, data_bytes) + return written + except OSError as e: + if e.errno == 5: # Input/output error - process might be dead + self.close(force=True) + self._closed = True + raise + + def read(self, size: int) -> str: + """ + Read data from pty process. + :param size: number of bytes to read + :return: data read as string + """ + if self._closed or self.fd < 0: + raise EOFError("Cannot read from closed pty process") + + try: + # Read raw bytes from pty master (non-blocking) + data_bytes = os.read(self.fd, size) + + if not data_bytes: + raise EOFError("End of file reached") + + # Decode bytes to string using the incremental decoder + # This handles partial UTF-8 sequences correctly + data_str = self.decoder.decode(data_bytes, final=False) + return data_str + + except OSError as e: + if e.errno == 11: # Resource temporarily unavailable (EAGAIN) + return "" # No data available, return empty string + elif e.errno == 5: # Input/output error - process might be dead + self.close(force=True) + raise EOFError("End of file reached") + else: + raise + + def close(self, force: bool = False) -> None: + """ + Close pty process. + :param force: if True, forcefully kill the process + :return: None + """ + if self._closed: + return + self._closed = True + + # Try to terminate the process gracefully first + if self.process and self.isalive(): + try: + if force: + self.process.kill() # SIGKILL + else: + self.process.terminate() # SIGTERM + + # Wait for process to end with timeout + try: + self.process.wait(timeout=self.delayafterclose) + except subprocess.TimeoutExpired: + if not force: + # If still alive and not forcing, try kill + self.process.kill() + self.process.wait(timeout=0.5) + except Exception as e: + print(f"Error terminating process: {e}") + + # Close file descriptors + if self.fd >= 0: + try: + os.close(self.fd) + except OSError: + pass + self.fd = -1 + + if self.slave_fd >= 0: + try: + os.close(self.slave_fd) + except OSError: + pass + self.slave_fd = -1 + + def isalive(self) -> bool: + """ + Check if pty process is alive. + :return: True if process is alive, False otherwise + """ + if self._closed or not self.process: + return False + + # Check if process is still running + poll_result = self.process.poll() + return poll_result is None # None means process is still running diff --git a/moler/io/raw/terminal_no_forkpty.py b/moler/io/raw/terminal_no_forkpty.py index 749a25d46..46e2db2fd 100644 --- a/moler/io/raw/terminal_no_forkpty.py +++ b/moler/io/raw/terminal_no_forkpty.py @@ -1,173 +1,28 @@ # -*- coding: utf-8 -*- -__author__ = "Marcin Usielski" -__copyright__ = "Copyright (C) 2026, Nokia" -__email__ = "marcin.usielski@nokia.com" +__author__ = "Michal Ernst, Marcin Usielski, Tomasz Krol" +__copyright__ = "Copyright (C) 2018-2026, Nokia" +__email__ = "michal.ernst@nokia.com, marcin.usielski@nokia.com, tomasz.krol@nokia.com" import codecs import contextlib -import fcntl -import os import re import select import datetime import logging -import pty -import shlex -import subprocess + import threading import time - +from typing import Tuple, List, Optional from moler.io.io_connection import IOConnection from moler.io.raw import TillDoneThread +from moler.io.raw.pty_process_unicode import PtyProcessUnicodeNotFork from moler.helpers import remove_all_known_special_chars from moler.helpers import all_chars_to_hex from moler.helpers import non_printable_chars_to_hex from moler.util import tracked_thread from moler.connection import Connection -from typing import Tuple, List, Optional - - -# Unix only. Does not work on Windows. - - -class PtyProcessUnicodeNotFork: - """PtyProcessUnicode without forking process.""" - def __init__(self, cmd: str = "/bin/bash", dimensions: Tuple[int, int] = (25, 120), buffer_size: int = 4096): - self.cmd: str = cmd - self.dimensions: Tuple[int, int] = dimensions - self.buffer_size: int = buffer_size - self.delayafterclose: float = 0.2 - self.encoding = "utf-8" - self.decoder = codecs.getincrementaldecoder(self.encoding)(errors='strict') - self.fd: int = -1 # File descriptor for pty master - self.pid: int = -1 # Process ID of the child process - self.slave_fd: int = -1 # File descriptor for pty slave - self.process: Optional[subprocess.Popen] = None # Subprocess.Popen object - self._closed: bool = True - - def create_pty_process(self): - """Create PtyProcessUnicode without forking process.""" - - # Create a new pty pair - master_fd, slave_fd = pty.openpty() - - # Set master fd to non-blocking mode - flags = fcntl.fcntl(master_fd, fcntl.F_GETFL) - fcntl.fcntl(master_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) - - # Start the subprocess with the slave fd - process = subprocess.Popen( - shlex.split(self.cmd), - stdin=slave_fd, - stdout=slave_fd, - stderr=slave_fd, - start_new_session=True - ) - - # Store the process information - self.fd = master_fd - self.slave_fd = slave_fd - self.pid = process.pid - self.process = process - self._closed = False - - time.sleep(0.1) - - def write(self, data: str): - """Write data to pty process.""" - if self._closed or self.fd < 0: - raise IOError("Cannot write to closed pty process") - - try: - # Convert string to bytes if necessary - if isinstance(data, str): - data_bytes = data.encode(self.encoding) - else: - data_bytes = data - - # Write data to the pty master - written = os.write(self.fd, data_bytes) - return written - except OSError as e: - if e.errno == 5: # Input/output error - process might be dead - self._closed = True - raise - - def read(self, size: int) -> str: - """Read data from pty process.""" - if self._closed or self.fd < 0: - raise EOFError("Cannot read from closed pty process") - - try: - # Read raw bytes from pty master (non-blocking) - data_bytes = os.read(self.fd, size) - - if not data_bytes: - raise EOFError("End of file reached") - - # Decode bytes to string using the incremental decoder - # This handles partial UTF-8 sequences correctly - data_str = self.decoder.decode(data_bytes, final=False) - return data_str - except OSError as e: - if e.errno == 11: # Resource temporarily unavailable (EAGAIN) - return "" # No data available, return empty string - elif e.errno == 5: # Input/output error - process might be dead - self._closed = True - raise EOFError("PTY process terminated") - else: - raise - - def close(self, force: bool = False) -> None: - """Close pty process.""" - if self._closed: - return - self._closed = True - - # Try to terminate the process gracefully first - if self.process and self.isalive(): - try: - if force: - self.process.kill() # SIGKILL - else: - self.process.terminate() # SIGTERM - - # Wait for process to end with timeout - try: - self.process.wait(timeout=self.delayafterclose) - except subprocess.TimeoutExpired: - if not force: - # If still alive and not forcing, try kill - self.process.kill() - self.process.wait(timeout=0.5) - except Exception as e: - print(f"Error terminating process: {e}") - - # Close file descriptors - if self.fd >= 0: - try: - os.close(self.fd) - except OSError: - pass - self.fd = -1 - - if self.slave_fd >= 0: - try: - os.close(self.slave_fd) - except OSError: - pass - self.slave_fd = -1 - - def isalive(self) -> bool: - """Check if pty process is alive.""" - if self._closed or not self.process: - return False - - # Check if process is still running - poll_result = self.process.poll() - return poll_result is None # None means process is still running class ThreadedTerminalNoForkPTY(IOConnection): @@ -189,7 +44,7 @@ def __init__( dimensions: Tuple[int, int] = (100, 300), terminal_delayafterclose: float = 0.2, ): - """# TODO: # 'export PS1="moler_bash\\$ "\n' would give moler_bash# for root and moler_bash$ for user + """ :param moler_connection: Moler's connection to join with :param cmd: command to run terminal :param select_timeout: timeout for reading data from terminal @@ -224,8 +79,11 @@ def __init__( self._terminal_delayafterclose = terminal_delayafterclose def open(self) -> contextlib.closing: - """Open ThreadedTerminal connection & start thread pulling data from it.""" - ret = super().open() + """ + Open ThreadedTerminal connection & start thread pulling data from it. + :return: context manager to allow for: with connection.open() as conn: + """ + context_manager = super().open() if not self._terminal: self.moler_connection.open() @@ -260,10 +118,13 @@ def open(self) -> contextlib.closing: self._terminal.write("\n") retry += 1 - return ret + return context_manager def close(self) -> None: - """Close ThreadedTerminal connection & stop pulling thread.""" + """ + Close ThreadedTerminal connection & stop pulling thread. + :return: None + """ if self.pulling_thread: self.pulling_thread.join() self.moler_connection.shutdown() @@ -282,13 +143,21 @@ def close(self) -> None: self.read_buffer = "" def send(self, data: str) -> None: - """Write data into ThreadedTerminal connection.""" + """ + Write data into ThreadedTerminal connection. + :param data: data to write + :return: None + """ if self._terminal: self._terminal.write(data) @tracked_thread.log_exit_exception def pull_data(self, pulling_done: threading.Event) -> None: - """Pull data from ThreadedTerminal connection.""" + """ + Pull data from ThreadedTerminal connection. + :param pulling_done: threading.Event to stop pulling when set + :return: None + """ logging.getLogger("moler_threads").debug(f"ENTER {self}") heartbeat = tracked_thread.report_alive() reads: List[int] = [] @@ -326,6 +195,11 @@ def pull_data(self, pulling_done: threading.Event) -> None: logging.getLogger("moler_threads").debug(f"EXIT {self}") def _verify_shell_is_operable(self, data: str) -> None: + """ + Verify if shell is operable by checking for prompts in incoming data. + :param data: incoming data to check + :return: None + """ self.read_buffer = self.read_buffer + data lines = self.read_buffer.splitlines() From 68db2c5932d048d25137d5e8cc2e896f63cb6fa2 Mon Sep 17 00:00:00 2001 From: Marcin Usielski Date: Mon, 12 Jan 2026 14:55:16 +0100 Subject: [PATCH 14/20] style --- moler/io/raw/pty_process_unicode.py | 2 +- moler/io/raw/terminal_no_forkpty.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/moler/io/raw/pty_process_unicode.py b/moler/io/raw/pty_process_unicode.py index ad133c690..a931a7636 100644 --- a/moler/io/raw/pty_process_unicode.py +++ b/moler/io/raw/pty_process_unicode.py @@ -66,7 +66,7 @@ def create_pty_process(self) -> None: time.sleep(0.1) - def write(self, data: str|bytes) -> int: + def write(self, data: str | bytes) -> int: """ Write data to pty process. :param data: data to write diff --git a/moler/io/raw/terminal_no_forkpty.py b/moler/io/raw/terminal_no_forkpty.py index 46e2db2fd..74f62d1a0 100644 --- a/moler/io/raw/terminal_no_forkpty.py +++ b/moler/io/raw/terminal_no_forkpty.py @@ -24,7 +24,6 @@ from moler.connection import Connection - class ThreadedTerminalNoForkPTY(IOConnection): """ Works on Unix (like Linux) systems only! From 21825b417664b0d2d64d41d9c78729c87263d482 Mon Sep 17 00:00:00 2001 From: Marcin Usielski Date: Mon, 12 Jan 2026 14:59:10 +0100 Subject: [PATCH 15/20] Union for older pythons --- moler/io/raw/pty_process_unicode.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/moler/io/raw/pty_process_unicode.py b/moler/io/raw/pty_process_unicode.py index a931a7636..4cd566a65 100644 --- a/moler/io/raw/pty_process_unicode.py +++ b/moler/io/raw/pty_process_unicode.py @@ -11,7 +11,7 @@ import subprocess import time -from typing import Tuple, Optional +from typing import Optional, Tuple, Union # Unix only. Does not work on Windows. @@ -66,7 +66,7 @@ def create_pty_process(self) -> None: time.sleep(0.1) - def write(self, data: str | bytes) -> int: + def write(self, data: Union[str, bytes]) -> int: """ Write data to pty process. :param data: data to write From d0140d2aff7148e9d9bfca2d0c10646d737393d9 Mon Sep 17 00:00:00 2001 From: Marcin Usielski Date: Tue, 13 Jan 2026 07:42:11 +0100 Subject: [PATCH 16/20] d --- moler/config/connections.py | 6 +++--- test/crt/test_unix_no_forkpty.py | 14 +++++++++----- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/moler/config/connections.py b/moler/config/connections.py index 46052daf1..9f66c8657 100644 --- a/moler/config/connections.py +++ b/moler/config/connections.py @@ -4,7 +4,7 @@ """ __author__ = 'Grzegorz Latuszek, Michal Ernst, Marcin Usielski' -__copyright__ = 'Copyright (C) 2018-2024, Nokia' +__copyright__ = 'Copyright (C) 2018-2026, Nokia' __email__ = 'grzegorz.latuszek@nokia.com, michal.ernst@nokia.com, marcin.usielski@nokia.com' import platform @@ -231,7 +231,7 @@ def terminal_nofork_thd_conn_st(name=None): # TODO: unify passing logger to io_conn (logger/logger_name) connection_factory.register_construction(io_type="terminal", variant="threaded", - constructor=terminal_thd_conn_mt) # Moler 2.0.0 will replace this to st + constructor=terminal_thd_conn_mt) # TODO: unify passing logger to io_conn (logger/logger_name) connection_factory.register_construction(io_type="terminal", @@ -246,7 +246,7 @@ def terminal_nofork_thd_conn_st(name=None): # TODO: unify passing logger to io_conn (logger/logger_name) connection_factory.register_construction(io_type="terminal_no_forkpty", variant="threaded", - constructor=terminal_nofork_thd_conn_mt) # Moler 2.0.0 will replace this to st + constructor=terminal_nofork_thd_conn_mt) # TODO: unify passing logger to io_conn (logger/logger_name) connection_factory.register_construction(io_type="terminal_no_forkpty", diff --git a/test/crt/test_unix_no_forkpty.py b/test/crt/test_unix_no_forkpty.py index 32e5fde1b..2ce2f232c 100644 --- a/test/crt/test_unix_no_forkpty.py +++ b/test/crt/test_unix_no_forkpty.py @@ -4,7 +4,6 @@ __copyright__ = 'Copyright (C) 2026, Nokia' __email__ = 'marcin.usielski@nokia.com' -import time import pytest import tempfile import os @@ -183,8 +182,13 @@ def test_ping_from_two_terminals(unix_terminal, unix_terminal2): unix1 = unix_terminal unix2 = unix_terminal2 - cmd_ping_1 = unix1.get_cmd(cmd_name="ping", cmd_params={'options': f"-c 2 -i 0.2", 'destination': '127.0.0.1'}) - cmd_ping_2 = unix2.get_cmd(cmd_name="ping", cmd_params={'options': f"-c 3 -i 0.1", 'destination': 'localhost'}) + count1 = 2 + count2 = 3 + options1 = f"-c {count1} -i 0.2" + options2 = f"-c {count2} -i 0.1" + + cmd_ping_1 = unix1.get_cmd(cmd_name="ping", cmd_params={'options': options1, 'destination': '127.0.0.1'}) + cmd_ping_2 = unix2.get_cmd(cmd_name="ping", cmd_params={'options': options2, 'destination': 'localhost'}) cmd_ping_1.start() cmd_ping_2.start() assert cmd_ping_1.running() @@ -192,9 +196,9 @@ def test_ping_from_two_terminals(unix_terminal, unix_terminal2): ret1 = cmd_ping_1.await_done(timeout=10) ret2 =cmd_ping_2.await_done(timeout=10) assert 'packets_transmitted' in ret1 - assert ret1['packets_transmitted'] == 2 + assert ret1['packets_transmitted'] == count1 assert 'packets_transmitted' in ret2 - assert ret2['packets_transmitted'] == 3 + assert ret2['packets_transmitted'] == count2 @pytest.fixture From d125e5ff6990aa5dfcded630f18d7bf5fd515fe0 Mon Sep 17 00:00:00 2001 From: Marcin Usielski Date: Tue, 13 Jan 2026 09:41:05 +0100 Subject: [PATCH 17/20] testref --- test/crt/test_unix_no_forkpty.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/crt/test_unix_no_forkpty.py b/test/crt/test_unix_no_forkpty.py index 2ce2f232c..c85573391 100644 --- a/test/crt/test_unix_no_forkpty.py +++ b/test/crt/test_unix_no_forkpty.py @@ -194,7 +194,7 @@ def test_ping_from_two_terminals(unix_terminal, unix_terminal2): assert cmd_ping_1.running() assert cmd_ping_2.running() ret1 = cmd_ping_1.await_done(timeout=10) - ret2 =cmd_ping_2.await_done(timeout=10) + ret2 = cmd_ping_2.await_done(timeout=10) assert 'packets_transmitted' in ret1 assert ret1['packets_transmitted'] == count1 assert 'packets_transmitted' in ret2 From 078f5d14069e539fe196a465717eeeb1868d6b45 Mon Sep 17 00:00:00 2001 From: Marcin Usielski Date: Tue, 13 Jan 2026 14:19:48 +0100 Subject: [PATCH 18/20] noforkpty -> nofork --- moler/config/connections.py | 12 ++++++------ .../{terminal_no_forkpty.py => terminal_no_fork.py} | 4 ++-- ...{test_unix_no_forkpty.py => test_unix_no_fork.py} | 4 ++-- test/integration/test_io_raw_terminal.py | 4 ++-- 4 files changed, 12 insertions(+), 12 deletions(-) rename moler/io/raw/{terminal_no_forkpty.py => terminal_no_fork.py} (98%) rename test/crt/{test_unix_no_forkpty.py => test_unix_no_fork.py} (97%) diff --git a/moler/config/connections.py b/moler/config/connections.py index 9f66c8657..af6ea973c 100644 --- a/moler/config/connections.py +++ b/moler/config/connections.py @@ -196,7 +196,7 @@ def tcp_asyncio_in_thrd_conn(port, host='localhost', name=None, **kwargs): # kw def _register_builtin_unix_connections(connection_factory, moler_conn_class): from moler.io.raw.terminal import ThreadedTerminal - from moler.io.raw.terminal_no_forkpty import ThreadedTerminalNoForkPTY + from moler.io.raw.terminal_no_fork import ThreadedTerminalNoFork def terminal_thd_conn_mt(name=None): # ThreadedTerminal works on unicode so moler_connection must do no encoding @@ -217,7 +217,7 @@ def terminal_nofork_thd_conn_mt(name=None): # ThreadedTerminal works on unicode so moler_connection must do no encoding # mlr_conn = mlr_conn_no_encoding(moler_conn_class, name=name) mlr_conn = mlr_conn_no_encoding_partial_clean_vt100(moler_conn_class, name=name) - io_conn = ThreadedTerminalNoForkPTY(moler_connection=mlr_conn) # TODO: add name, logger + io_conn = ThreadedTerminalNoFork(moler_connection=mlr_conn) # TODO: add name, logger return io_conn def terminal_nofork_thd_conn_st(name=None): @@ -225,7 +225,7 @@ def terminal_nofork_thd_conn_st(name=None): # mlr_conn = mlr_conn_no_encoding(moler_conn_class, name=name) from moler.moler_connection_for_single_thread_runner import MolerConnectionForSingleThreadRunner mlr_conn = mlr_conn_no_encoding_partial_clean_vt100(MolerConnectionForSingleThreadRunner, name=name) - io_conn = ThreadedTerminalNoForkPTY(moler_connection=mlr_conn) # TODO: add name, logger + io_conn = ThreadedTerminalNoFork(moler_connection=mlr_conn) # TODO: add name, logger return io_conn # TODO: unify passing logger to io_conn (logger/logger_name) @@ -244,17 +244,17 @@ def terminal_nofork_thd_conn_st(name=None): constructor=terminal_thd_conn_st) # TODO: unify passing logger to io_conn (logger/logger_name) - connection_factory.register_construction(io_type="terminal_no_forkpty", + connection_factory.register_construction(io_type="terminal_no_fork", variant="threaded", constructor=terminal_nofork_thd_conn_mt) # TODO: unify passing logger to io_conn (logger/logger_name) - connection_factory.register_construction(io_type="terminal_no_forkpty", + connection_factory.register_construction(io_type="terminal_no_fork", variant="multi-threaded", constructor=terminal_nofork_thd_conn_mt) # TODO: unify passing logger to io_conn (logger/logger_name) - connection_factory.register_construction(io_type="terminal_no_forkpty", + connection_factory.register_construction(io_type="terminal_no_fork", variant="single-threaded", constructor=terminal_nofork_thd_conn_st) diff --git a/moler/io/raw/terminal_no_forkpty.py b/moler/io/raw/terminal_no_fork.py similarity index 98% rename from moler/io/raw/terminal_no_forkpty.py rename to moler/io/raw/terminal_no_fork.py index 74f62d1a0..2bb12132e 100644 --- a/moler/io/raw/terminal_no_forkpty.py +++ b/moler/io/raw/terminal_no_fork.py @@ -24,11 +24,11 @@ from moler.connection import Connection -class ThreadedTerminalNoForkPTY(IOConnection): +class ThreadedTerminalNoFork(IOConnection): """ Works on Unix (like Linux) systems only! - ThreadedTerminalNoForkPTY is shell working under Pty + ThreadedTerminalNoFork is shell working under Pty """ def __init__( diff --git a/test/crt/test_unix_no_forkpty.py b/test/crt/test_unix_no_fork.py similarity index 97% rename from test/crt/test_unix_no_forkpty.py rename to test/crt/test_unix_no_fork.py index c85573391..070957faa 100644 --- a/test/crt/test_unix_no_forkpty.py +++ b/test/crt/test_unix_no_fork.py @@ -203,13 +203,13 @@ def test_ping_from_two_terminals(unix_terminal, unix_terminal2): @pytest.fixture def unix_terminal(): - unix = UnixLocal(io_type='terminal_no_forkpty', variant='threaded') + unix = UnixLocal(io_type='terminal_no_fork', variant='threaded') unix.establish_connection() yield unix @pytest.fixture def unix_terminal2(): - unix = UnixLocal(io_type='terminal_no_forkpty', variant='threaded') + unix = UnixLocal(io_type='terminal_no_fork', variant='threaded') unix.establish_connection() yield unix diff --git a/test/integration/test_io_raw_terminal.py b/test/integration/test_io_raw_terminal.py index 26cf73d60..bf89abd10 100644 --- a/test/integration/test_io_raw_terminal.py +++ b/test/integration/test_io_raw_terminal.py @@ -17,7 +17,7 @@ from moler.cmd.unix.lsof import Lsof from moler.exceptions import CommandTimeout from moler.io.raw.terminal import ThreadedTerminal -from moler.io.raw.terminal_no_forkpty import ThreadedTerminalNoForkPTY +from moler.io.raw.terminal_no_fork import ThreadedTerminalNoFork def test_terminal_cmd_whoami_during_ping(terminal_connection): @@ -84,7 +84,7 @@ def test_terminal_lsof(terminal_connection): assert ret["NUMBER"] > 1 -@pytest.fixture(params=[ThreadedTerminal, ThreadedTerminalNoForkPTY]) +@pytest.fixture(params=[ThreadedTerminal, ThreadedTerminalNoFork]) def terminal_connection(request): from moler.threaded_moler_connection import ThreadedMolerConnection From 4d138c908372214698c724199ed598440405d7a1 Mon Sep 17 00:00:00 2001 From: Marcin Usielski Date: Tue, 13 Jan 2026 15:07:47 +0100 Subject: [PATCH 19/20] ret type --- moler/io/io_connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/moler/io/io_connection.py b/moler/io/io_connection.py index f30e49765..0677fea2a 100644 --- a/moler/io/io_connection.py +++ b/moler/io/io_connection.py @@ -138,7 +138,7 @@ def subscribe_on_connection_made(self, subscriber: Callable) -> None: self._connect_subscribers_lock, self._connect_subscribers, subscriber ) - def subscribe_on_connection_lost(self, subscriber: Callable): + def subscribe_on_connection_lost(self, subscriber: Callable) -> None: """ Adds subscriber to list of functions to call when connection is closed/disconnected :param subscriber: reference to function to call when connection is closed/disconnected From fbf70e61e38fb9802bb9e511ffdfffd2f9e0ba01 Mon Sep 17 00:00:00 2001 From: Marcin Usielski Date: Wed, 14 Jan 2026 14:44:47 +0100 Subject: [PATCH 20/20] d@ --- moler/io/io_connection.py | 2 +- moler/io/raw/pty_process_unicode.py | 11 ++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/moler/io/io_connection.py b/moler/io/io_connection.py index 0677fea2a..37634015f 100644 --- a/moler/io/io_connection.py +++ b/moler/io/io_connection.py @@ -80,7 +80,7 @@ def open(self) -> contextlib.closing: self.moler_connection.open() return contextlib.closing(self) - def close(self): + def close(self) -> None: """Close established connection.""" def __enter__(self): diff --git a/moler/io/raw/pty_process_unicode.py b/moler/io/raw/pty_process_unicode.py index 4cd566a65..b0f33764a 100644 --- a/moler/io/raw/pty_process_unicode.py +++ b/moler/io/raw/pty_process_unicode.py @@ -13,6 +13,7 @@ from typing import Optional, Tuple, Union +from moler.exceptions import MolerException # Unix only. Does not work on Windows. @@ -73,12 +74,12 @@ def write(self, data: Union[str, bytes]) -> int: :return: number of bytes written """ if self._closed or self.fd < 0: - raise IOError("Cannot write to closed pty process") + raise MolerException("Cannot write to closed pty process") try: # Convert string to bytes if necessary if isinstance(data, str): - data_bytes = data.encode(self.encoding) + data_bytes = data.encode(self.encoding, errors='ignore') else: data_bytes = data @@ -98,14 +99,14 @@ def read(self, size: int) -> str: :return: data read as string """ if self._closed or self.fd < 0: - raise EOFError("Cannot read from closed pty process") + raise MolerException("Cannot read from closed pty process") try: # Read raw bytes from pty master (non-blocking) data_bytes = os.read(self.fd, size) if not data_bytes: - raise EOFError("End of file reached") + raise MolerException("End of file reached") # Decode bytes to string using the incremental decoder # This handles partial UTF-8 sequences correctly @@ -117,7 +118,7 @@ def read(self, size: int) -> str: return "" # No data available, return empty string elif e.errno == 5: # Input/output error - process might be dead self.close(force=True) - raise EOFError("End of file reached") + raise MolerException("End of file reached") else: raise