Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest, macos-latest]
python-version: [3.8, 3.9, "3.10", 3.11, 3.12, 3.13]
python-version: [3.8, 3.9, "3.10", 3.11, 3.12, 3.13, 3.14]
exclude:
- os: macos-latest
python-version: 3.8
Expand Down
35 changes: 33 additions & 2 deletions moler/config/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""

__author__ = 'Grzegorz Latuszek, Michal Ernst, Marcin Usielski'
__copyright__ = 'Copyright (C) 2018-2024, Nokia'
__copyright__ = 'Copyright (C) 2018-2026, Nokia'
__email__ = 'grzegorz.latuszek@nokia.com, michal.ernst@nokia.com, marcin.usielski@nokia.com'

import platform
Expand Down Expand Up @@ -196,6 +196,7 @@ def tcp_asyncio_in_thrd_conn(port, host='localhost', name=None, **kwargs): # kw

def _register_builtin_unix_connections(connection_factory, moler_conn_class):
from moler.io.raw.terminal import ThreadedTerminal
from moler.io.raw.terminal_no_fork import ThreadedTerminalNoFork

def terminal_thd_conn_mt(name=None):
# ThreadedTerminal works on unicode so moler_connection must do no encoding
Expand All @@ -212,10 +213,25 @@ def terminal_thd_conn_st(name=None):
io_conn = ThreadedTerminal(moler_connection=mlr_conn) # TODO: add name, logger
return io_conn

def terminal_nofork_thd_conn_mt(name=None):
# ThreadedTerminal works on unicode so moler_connection must do no encoding
# mlr_conn = mlr_conn_no_encoding(moler_conn_class, name=name)
mlr_conn = mlr_conn_no_encoding_partial_clean_vt100(moler_conn_class, name=name)
io_conn = ThreadedTerminalNoFork(moler_connection=mlr_conn) # TODO: add name, logger
return io_conn

def terminal_nofork_thd_conn_st(name=None):
# ThreadedTerminal works on unicode so moler_connection must do no encoding
# mlr_conn = mlr_conn_no_encoding(moler_conn_class, name=name)
from moler.moler_connection_for_single_thread_runner import MolerConnectionForSingleThreadRunner
mlr_conn = mlr_conn_no_encoding_partial_clean_vt100(MolerConnectionForSingleThreadRunner, name=name)
io_conn = ThreadedTerminalNoFork(moler_connection=mlr_conn) # TODO: add name, logger
return io_conn

# TODO: unify passing logger to io_conn (logger/logger_name)
connection_factory.register_construction(io_type="terminal",
variant="threaded",
constructor=terminal_thd_conn_mt) # Moler 2.0.0 will replace this to st
constructor=terminal_thd_conn_mt)

# TODO: unify passing logger to io_conn (logger/logger_name)
connection_factory.register_construction(io_type="terminal",
Expand All @@ -227,6 +243,21 @@ def terminal_thd_conn_st(name=None):
variant="single-threaded",
constructor=terminal_thd_conn_st)

# TODO: unify passing logger to io_conn (logger/logger_name)
connection_factory.register_construction(io_type="terminal_no_fork",
variant="threaded",
constructor=terminal_nofork_thd_conn_mt)

# TODO: unify passing logger to io_conn (logger/logger_name)
connection_factory.register_construction(io_type="terminal_no_fork",
variant="multi-threaded",
constructor=terminal_nofork_thd_conn_mt)

# TODO: unify passing logger to io_conn (logger/logger_name)
connection_factory.register_construction(io_type="terminal_no_fork",
variant="single-threaded",
constructor=terminal_nofork_thd_conn_st)


def _register_builtin_py3_unix_connections(connection_factory, moler_conn_class):
from moler.io.asyncio.terminal import AsyncioTerminal, AsyncioInThreadTerminal
Expand Down
58 changes: 29 additions & 29 deletions moler/io/io_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,45 +29,48 @@
"""

__author__ = "Grzegorz Latuszek, Marcin Usielski"
__copyright__ = "Copyright (C) 2018-2022, Nokia"
__copyright__ = "Copyright (C) 2018-2026, Nokia"
__email__ = "grzegorz.latuszek@nokia.com, marcin.usielski@nokia.com"

import contextlib
import logging
from threading import Lock
from moler.connection import Connection
from typing import Callable, List
from datetime import datetime


class IOConnection:
"""External-IO connection."""

def __init__(self, moler_connection):
def __init__(self, moler_connection: Connection):
"""
Specific constructor of external-IO should store information
how to establish connection (like host/port info)

:param moler_connection: object of abstract class moler.connection.Connection
"""
super(IOConnection, self).__init__()
self._connect_subscribers = []
self._connect_subscribers: List[Callable] = []
self._connect_subscribers_lock = Lock()
self._disconnect_subscribers = []
self._disconnect_subscribers: List[Callable] = []
self._disconnect_subscribers_lock = Lock()
self.moler_connection = moler_connection
self.__name = "UNNAMED_IO_CONNECTION"
self.moler_connection: Connection = moler_connection
self.__name: str = "UNNAMED_IO_CONNECTION"
self.logger = logging.getLogger(f"moler.connection.{self.__name}.io")
# plugin the way we output data to external world
self.moler_connection.how2send = self.send

@property
def name(self):
def name(self) -> str:
return self.__name

@name.setter
def name(self, value):
def name(self, value: str) -> None:
self.__name = value
self.logger = logging.getLogger(f"moler.connection.{self.__name}.io")

def open(self):
def open(self) -> contextlib.closing:
"""
Take 'how to establish connection' info from constructor
and open that connection.
Expand All @@ -77,7 +80,7 @@ def open(self):
self.moler_connection.open()
return contextlib.closing(self)

def close(self):
def close(self) -> None:
"""Close established connection."""

def __enter__(self):
Expand All @@ -88,35 +91,32 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False # reraise exceptions if any

def send(self, data):
def send(self, data: str) -> None:
"""
Send data bytes over external-IO.

Because of plugin done in constructor it will be called
by moler_connection when it needs.
"""

def receive(self):
def receive(self) -> bytes:
"""
Pull data bytes from external-IO:

data = io_connection.receive()

and then forward it to Moler's connection:

self.moler_connection.data_received(data)

"""
return b""

def data_received(self, data, recv_time):
def data_received(self, data: str, recv_time: datetime) -> None:
"""
Having been given data bytes from external-IO:

just forward it to Moler's connection:
"""
self.moler_connection.data_received(data=data, recv_time=recv_time)

def notify(self, callback, when):
def notify(self, callback: Callable, when: str) -> None:
"""
Adds subscriber to list of functions to call
:param callback: reference to function to call when connection is open/established
Expand All @@ -128,7 +128,7 @@ def notify(self, callback, when):
elif when == "connection_lost":
self.subscribe_on_connection_lost(subscriber=callback)

def subscribe_on_connection_made(self, subscriber):
def subscribe_on_connection_made(self, subscriber: Callable) -> None:
"""
Adds subscriber to list of functions to call when connection is open/established (also reopen after close)
:param subscriber: reference to function to call when connection is open/established
Expand All @@ -138,7 +138,7 @@ def subscribe_on_connection_made(self, subscriber):
self._connect_subscribers_lock, self._connect_subscribers, subscriber
)

def subscribe_on_connection_lost(self, subscriber):
def subscribe_on_connection_lost(self, subscriber: Callable) -> None:
"""
Adds subscriber to list of functions to call when connection is closed/disconnected
:param subscriber: reference to function to call when connection is closed/disconnected
Expand All @@ -148,7 +148,7 @@ def subscribe_on_connection_lost(self, subscriber):
self._disconnect_subscribers_lock, self._disconnect_subscribers, subscriber
)

def unsubscribe_on_connection_made(self, subscriber):
def unsubscribe_on_connection_made(self, subscriber: Callable) -> None:
"""
Remove subscriber from list of functions to call when connection is open/established (also reopen after close)
:param subscriber: reference to function registered by method subscribe_on_connection_made
Expand All @@ -158,7 +158,7 @@ def unsubscribe_on_connection_made(self, subscriber):
self._connect_subscribers_lock, self._connect_subscribers, subscriber
)

def unsubscribe_on_connection_lost(self, subscriber):
def unsubscribe_on_connection_lost(self, subscriber: Callable) -> None:
"""
Remove subscriber from list of functions to call when connection is closed/disconnected
:param subscriber: reference to function registered by method subscribe_on_connection_lost
Expand All @@ -168,45 +168,45 @@ def unsubscribe_on_connection_lost(self, subscriber):
self._disconnect_subscribers_lock, self._disconnect_subscribers, subscriber
)

def disable_logging(self):
def disable_logging(self) -> None:
"""
Disable logging incoming data.
:return: None
"""
self.moler_connection.disable_logging()

def enable_logging(self):
def enable_logging(self) -> None:
"""
Enable logging incoming data.
:return: None
"""
self.moler_connection.enable_logging()

def _notify(self, lock, subscribers):
def _notify(self, lock: Lock, subscribers: List[Callable]) -> None:
with lock:
copied_subscribers = subscribers[:]
for subscriber in copied_subscribers:
subscriber(self)

def _notify_on_connect(self):
def _notify_on_connect(self) -> None:
self.logger.info(
msg=f"Connection to: '{self.name}' has been opened.",
extra={"log_name": self.name},
)
self._notify(self._connect_subscribers_lock, self._connect_subscribers)

def _notify_on_disconnect(self):
def _notify_on_disconnect(self) -> None:
self.logger.info(
msg=f"Connection to: '{self.name}' has been closed.",
extra={"log_name": self.name},
)
self._notify(self._disconnect_subscribers_lock, self._disconnect_subscribers)

def _subscribe(self, lock, subscribers, subscriber):
def _subscribe(self, lock: Lock, subscribers: List[Callable], subscriber: Callable) -> None:
with lock:
if subscriber not in subscribers:
subscribers.append(subscriber)

def _unsubscribe(self, lock, subscribers, subscriber):
def _unsubscribe(self, lock: Lock, subscribers: List[Callable], subscriber: Callable) -> None:
with lock:
subscribers.remove(subscriber)
Loading