Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
[![Passing?](https://github.com/eclipse-volttron/lib-protocol-proxy/actions/workflows/run-tests.yml/badge.svg)](https://github.com/eclipse-volttron/lib-protocol-proxy/actions/workflows/run-tests.yml)
[![pypi version](https://img.shields.io/pypi/v/protocol-proxy.svg)](https://pypi.org/project/protocol-proxy/)

This library provides the user with the ability to automatically deploy and manager proxy processes for handling
This library provides the user with the ability to automatically deploy and manage proxy processes for handling
network communication with remote devices using various protocols. A proxy to each remote peer is established in
a separate process from the managing application. A manager class handles socket communication between the proxy
subprocess and its owner. Individual protocols are implemented as plugins to this library. Integration with
Expand All @@ -28,7 +28,7 @@ pip install lib-protocol-proxy
```

Protocol Proxy plugins should include "protocol-proxy" as a requirement, so users of existing
plugins are encouraged to instead install the plugin for that pacakge directly.
plugins are encouraged to instead install the plugin for that package directly.

# Development
This library is maintained by the VOLTTRON Development Team.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ ignore_missing_imports = true

[tool.poetry]
name = "protocol-proxy"
version = "2.0.0rc2"
version = "2.0.0rc1"
description = "A system for launching and communicating with a proxy application for network communication which runs in a separate process.."
authors = ["The VOLTTRON Development Team <volttron@pnnl.gov>"]
license = "Apache License 2.0"
Expand Down
18 changes: 17 additions & 1 deletion src/protocol_proxy/ipc/decorator.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import logging

from typing import Any
from functools import wraps

_log = logging.getLogger(__name__)


# TODO: Do we need an Asyncio version of this?
# TODO: Did this work with the AsyncResult removed (just returns, possibly within greenlet)?
def callback(func):
@wraps(func)
def verify(self, ipc, headers, raw_message: bytes):
if peer := ipc.peers.get(headers.sender_id):
if headers.sender_token == peer.token:
Expand All @@ -19,3 +20,18 @@ def verify(self, ipc, headers, raw_message: bytes):
_log.warning(f'Request from unknown party: {headers.sender_id}')
return None
return verify

def async_callback(func):
@wraps(func)
async def verify(self, ipc, headers, raw_message: bytes):
if peer := ipc.peers.get(headers.sender_id):
if headers.sender_token == peer.token:
return await func(self, headers, raw_message)
else:
_log.warning(f'Unable to authenticate caller: {headers.sender_id}')
return None
else:
_log.warning(f'Request from unknown party: {headers.sender_id}')
return None

return verify
2 changes: 1 addition & 1 deletion src/protocol_proxy/ipc/gevent.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def send(self, remote: ProtocolProxyPeer, message: ProtocolProxyMessage) -> bool
return False
if message.request_id is None:
message.request_id = self.next_request_id
self.outbound_messages[outbound] = message
self.outbound_messages[outbound] = message # TODO: Should there be a check for empty messages before this and next lines?
self.outbounds.add(outbound)
if message.response_expected:
async_result = AsyncResult()
Expand Down
21 changes: 21 additions & 0 deletions src/protocol_proxy/proxy/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import json
import logging

from importlib import import_module
from pkgutil import iter_modules
from uuid import UUID

from ..ipc import IPCConnector, ProtocolProxyMessage, ProtocolProxyPeer, SocketParams
Expand All @@ -25,6 +27,7 @@ def __init__(self, *, manager_address: str, manager_port: int, manager_id: UUID,
self.registration_retry_delay: float = registration_retry_delay
self.manager_params = SocketParams(manager_address, manager_port)
self.manager = manager_id
self.apply_plugins()


@abc.abstractmethod
Expand All @@ -42,6 +45,24 @@ def get_unique_remote_id(cls, unique_remote_id: tuple) -> tuple:
def send_registration(self, remote: ProtocolProxyPeer) -> ProtocolProxyMessage:
"""Send a registration message to the remote manager."""

def apply_plugins(self):
try:
installed_plugins = import_module(f'protocol_proxy.plugins.protocol.{self.__module__.split(".")[2]}')
for m in iter_modules(installed_plugins.__path__, installed_plugins.__name__ + '.'):
if hasattr(m, 'name') and m.name.split('.')[-1]:
module = import_module(m.name)
if hasattr(module, 'PROXY_PLUGINS'):
for interface_plugin in module.PROXY_PLUGINS:
interface_plugin.plug_into(self)
except ModuleNotFoundError:
return
except AttributeError as e:
_log.warning(f'Unable to load plugin "{m.name}: {e}')
except IndexError as e:
_log.warning('Unable to determine protocol_type to load plugins.')
except Exception as e:
_log.warning(f'Unexpected error loading plugins: {e}')

def _get_registration_message(self):
# _log.debug(f'{self.proxy_name}: IN GET REGISTRATION MESSAGE')
local_address, local_port = self.get_local_socket_params()
Expand Down
54 changes: 54 additions & 0 deletions src/protocol_proxy/proxy/plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import logging

from functools import wraps
from types import MethodType

from .base import ProtocolProxy

_log = logging.getLogger(__name__)


class ProxyPlugin:
"""
Plugin to add functionality to an existing driver interface.
Use as:
di = MyDriverInterface()
MyDriverPlugin.plug_into(di)
di.method_from_plugin() # The method is now available as if it were part of MyDriverInterface.

"""
# Plugins should specify the specific class they are extending unless they are intended
# to work with all possible interfaces or proxies (in which case specify BaseInterface/ProtocolProxy).
EXTENDED_CLASS = ProtocolProxy

# Plugins should specify a set of method names from the plugin class which will be added to the extended interface.
PLUGIN_METHODS = set()
# API_METHODS <-- {api_name: {'method_name': method_name, Optional['provides_response': bool], Optional['timeout': float]]}
PROTOCOL_PROXY_METHODS: dict[str, dict[str, str | bool | float]] = dict()

@classmethod
def plug_into(cls, proxy):
if not isinstance(proxy, cls.EXTENDED_CLASS):
raise TypeError(f'Plugin "{cls.__name__} extends {cls.EXTENDED_CLASS.__name__}'
f' not {proxy.__class__.__name__}')
for method_name in cls.PLUGIN_METHODS:
method = getattr(cls, method_name)
_log.info(f'Attaching plugin method: "{method_name}" to: {proxy}')
method = cls._add_overridden(method, getattr(proxy, method_name)) if hasattr(proxy, method_name) else method
setattr(proxy, method_name, MethodType(method, proxy))
kwargs = {}
for api_name, params in cls.PROTOCOL_PROXY_METHODS.items():
if provides_response := params.get('provides_response'):
kwargs['provides_response'] = provides_response
if timeout := params.get('timeout'):
kwargs['timeout'] = timeout
proxy.register_callback(MethodType(getattr(cls, params['method_name']), proxy), api_name, **kwargs)

@staticmethod
def _add_overridden(func, overridden_method):
@wraps(func)
def wrapper(*args, **kwargs):
kwargs['overridden'] = overridden_method
return func(*args, **kwargs)

return wrapper
Loading