diff --git a/README.md b/README.md index 6a2f6a3..fd26221 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ [![Passing?](https://github.com/eclipse-volttron/lib-protocol-proxy/actions/workflows/run-tests.yml/badge.svg)](https://github.com/eclipse-volttron/lib-protocol-proxy/actions/workflows/run-tests.yml) [![pypi version](https://img.shields.io/pypi/v/protocol-proxy.svg)](https://pypi.org/project/protocol-proxy/) -This library provides the user with the ability to automatically deploy and manager proxy processes for handling +This library provides the user with the ability to automatically deploy and manage proxy processes for handling network communication with remote devices using various protocols. A proxy to each remote peer is established in a separate process from the managing application. A manager class handles socket communication between the proxy subprocess and its owner. Individual protocols are implemented as plugins to this library. Integration with @@ -28,7 +28,7 @@ pip install lib-protocol-proxy ``` Protocol Proxy plugins should include "protocol-proxy" as a requirement, so users of existing -plugins are encouraged to instead install the plugin for that pacakge directly. +plugins are encouraged to instead install the plugin for that package directly. # Development This library is maintained by the VOLTTRON Development Team. diff --git a/pyproject.toml b/pyproject.toml index 099a91c..0273d8e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,7 @@ ignore_missing_imports = true [tool.poetry] name = "protocol-proxy" -version = "2.0.0rc2" +version = "2.0.0rc1" description = "A system for launching and communicating with a proxy application for network communication which runs in a separate process.." authors = ["The VOLTTRON Development Team "] license = "Apache License 2.0" diff --git a/src/protocol_proxy/ipc/decorator.py b/src/protocol_proxy/ipc/decorator.py index 0d57db7..54f2581 100644 --- a/src/protocol_proxy/ipc/decorator.py +++ b/src/protocol_proxy/ipc/decorator.py @@ -1,6 +1,6 @@ import logging -from typing import Any +from functools import wraps _log = logging.getLogger(__name__) @@ -8,6 +8,7 @@ # TODO: Do we need an Asyncio version of this? # TODO: Did this work with the AsyncResult removed (just returns, possibly within greenlet)? def callback(func): + @wraps(func) def verify(self, ipc, headers, raw_message: bytes): if peer := ipc.peers.get(headers.sender_id): if headers.sender_token == peer.token: @@ -19,3 +20,18 @@ def verify(self, ipc, headers, raw_message: bytes): _log.warning(f'Request from unknown party: {headers.sender_id}') return None return verify + +def async_callback(func): + @wraps(func) + async def verify(self, ipc, headers, raw_message: bytes): + if peer := ipc.peers.get(headers.sender_id): + if headers.sender_token == peer.token: + return await func(self, headers, raw_message) + else: + _log.warning(f'Unable to authenticate caller: {headers.sender_id}') + return None + else: + _log.warning(f'Request from unknown party: {headers.sender_id}') + return None + + return verify diff --git a/src/protocol_proxy/ipc/gevent.py b/src/protocol_proxy/ipc/gevent.py index 8e65a15..a18762a 100644 --- a/src/protocol_proxy/ipc/gevent.py +++ b/src/protocol_proxy/ipc/gevent.py @@ -107,7 +107,7 @@ def send(self, remote: ProtocolProxyPeer, message: ProtocolProxyMessage) -> bool return False if message.request_id is None: message.request_id = self.next_request_id - self.outbound_messages[outbound] = message + self.outbound_messages[outbound] = message # TODO: Should there be a check for empty messages before this and next lines? self.outbounds.add(outbound) if message.response_expected: async_result = AsyncResult() diff --git a/src/protocol_proxy/proxy/base.py b/src/protocol_proxy/proxy/base.py index d5fd9f8..c6f17e8 100644 --- a/src/protocol_proxy/proxy/base.py +++ b/src/protocol_proxy/proxy/base.py @@ -2,6 +2,8 @@ import json import logging +from importlib import import_module +from pkgutil import iter_modules from uuid import UUID from ..ipc import IPCConnector, ProtocolProxyMessage, ProtocolProxyPeer, SocketParams @@ -25,6 +27,7 @@ def __init__(self, *, manager_address: str, manager_port: int, manager_id: UUID, self.registration_retry_delay: float = registration_retry_delay self.manager_params = SocketParams(manager_address, manager_port) self.manager = manager_id + self.apply_plugins() @abc.abstractmethod @@ -42,6 +45,24 @@ def get_unique_remote_id(cls, unique_remote_id: tuple) -> tuple: def send_registration(self, remote: ProtocolProxyPeer) -> ProtocolProxyMessage: """Send a registration message to the remote manager.""" + def apply_plugins(self): + try: + installed_plugins = import_module(f'protocol_proxy.plugins.protocol.{self.__module__.split(".")[2]}') + for m in iter_modules(installed_plugins.__path__, installed_plugins.__name__ + '.'): + if hasattr(m, 'name') and m.name.split('.')[-1]: + module = import_module(m.name) + if hasattr(module, 'PROXY_PLUGINS'): + for interface_plugin in module.PROXY_PLUGINS: + interface_plugin.plug_into(self) + except ModuleNotFoundError: + return + except AttributeError as e: + _log.warning(f'Unable to load plugin "{m.name}: {e}') + except IndexError as e: + _log.warning('Unable to determine protocol_type to load plugins.') + except Exception as e: + _log.warning(f'Unexpected error loading plugins: {e}') + def _get_registration_message(self): # _log.debug(f'{self.proxy_name}: IN GET REGISTRATION MESSAGE') local_address, local_port = self.get_local_socket_params() diff --git a/src/protocol_proxy/proxy/plugin.py b/src/protocol_proxy/proxy/plugin.py new file mode 100644 index 0000000..bc89884 --- /dev/null +++ b/src/protocol_proxy/proxy/plugin.py @@ -0,0 +1,54 @@ +import logging + +from functools import wraps +from types import MethodType + +from .base import ProtocolProxy + +_log = logging.getLogger(__name__) + + +class ProxyPlugin: + """ + Plugin to add functionality to an existing driver interface. + Use as: + di = MyDriverInterface() + MyDriverPlugin.plug_into(di) + di.method_from_plugin() # The method is now available as if it were part of MyDriverInterface. + + """ + # Plugins should specify the specific class they are extending unless they are intended + # to work with all possible interfaces or proxies (in which case specify BaseInterface/ProtocolProxy). + EXTENDED_CLASS = ProtocolProxy + + # Plugins should specify a set of method names from the plugin class which will be added to the extended interface. + PLUGIN_METHODS = set() + # API_METHODS <-- {api_name: {'method_name': method_name, Optional['provides_response': bool], Optional['timeout': float]]} + PROTOCOL_PROXY_METHODS: dict[str, dict[str, str | bool | float]] = dict() + + @classmethod + def plug_into(cls, proxy): + if not isinstance(proxy, cls.EXTENDED_CLASS): + raise TypeError(f'Plugin "{cls.__name__} extends {cls.EXTENDED_CLASS.__name__}' + f' not {proxy.__class__.__name__}') + for method_name in cls.PLUGIN_METHODS: + method = getattr(cls, method_name) + _log.info(f'Attaching plugin method: "{method_name}" to: {proxy}') + method = cls._add_overridden(method, getattr(proxy, method_name)) if hasattr(proxy, method_name) else method + setattr(proxy, method_name, MethodType(method, proxy)) + kwargs = {} + for api_name, params in cls.PROTOCOL_PROXY_METHODS.items(): + if provides_response := params.get('provides_response'): + kwargs['provides_response'] = provides_response + if timeout := params.get('timeout'): + kwargs['timeout'] = timeout + proxy.register_callback(MethodType(getattr(cls, params['method_name']), proxy), api_name, **kwargs) + + @staticmethod + def _add_overridden(func, overridden_method): + @wraps(func) + def wrapper(*args, **kwargs): + kwargs['overridden'] = overridden_method + return func(*args, **kwargs) + + return wrapper