From 198acb7bf35962515016ac0c6662b474c03cb13b Mon Sep 17 00:00:00 2001 From: Sebastiaan la Fleur Date: Tue, 28 Oct 2025 17:25:58 +0100 Subject: [PATCH 1/3] 139: Moving big chunks of code places. --- src/s2python/connection/medium/s2_medium.py | 36 ++ src/s2python/connection/medium/websocket.py | 80 +++ .../connection/s2_async_connection.py | 538 ++++++++++++++++++ src/s2python/connection/s2_sync_connection.py | 397 +++++++++++++ src/s2python/s2_parser.py | 11 +- 5 files changed, 1058 insertions(+), 4 deletions(-) create mode 100644 src/s2python/connection/medium/s2_medium.py create mode 100644 src/s2python/connection/medium/websocket.py create mode 100644 src/s2python/connection/s2_async_connection.py create mode 100644 src/s2python/connection/s2_sync_connection.py diff --git a/src/s2python/connection/medium/s2_medium.py b/src/s2python/connection/medium/s2_medium.py new file mode 100644 index 0000000..a24c0c0 --- /dev/null +++ b/src/s2python/connection/medium/s2_medium.py @@ -0,0 +1,36 @@ +import abc +from typing import AsyncGenerator + +from s2python.s2_parser import UnparsedS2Message + + +class S2MediumException(Exception): + ... + +class MediumClosedConnectionError(S2MediumException): + ... + +class MediumCouldNotConnectError(S2MediumException): + ... + + +class S2Medium(abc.ABC): + @abc.abstractmethod + async def connect(self) -> None: + ... + + @abc.abstractmethod + async def is_connected(self) -> bool: + ... + + @abc.abstractmethod + async def messages(self) -> AsyncGenerator[UnparsedS2Message, None]: + ... + + @abc.abstractmethod + async def send(self, message: str) -> None: + ... + + @abc.abstractmethod + async def close(self) -> None: + ... \ No newline at end of file diff --git a/src/s2python/connection/medium/websocket.py b/src/s2python/connection/medium/websocket.py new file mode 100644 index 0000000..3d39c4c --- /dev/null +++ b/src/s2python/connection/medium/websocket.py @@ -0,0 +1,80 @@ +import logging +import ssl +from typing import Generator, AsyncGenerator, Optional, Dict, Any + +from s2python.s2_parser import UnparsedS2Message + +try: + import websockets + from websockets.asyncio.client import ( + ClientConnection as WSConnection, + connect as ws_connect, + ) +except ImportError as exc: + raise ImportError( + "The 'websockets' package is required. Run 'pip install s2-python[ws]' to use this feature." + ) from exc + +from s2python.connection.medium.s2_medium import S2Medium, MediumClosedConnectionError, MediumCouldNotConnectError + +logger = logging.getLogger("s2python") + + +class WebsocketMedium(S2Medium): + url: str + + _ws: Optional[WSConnection] + _verify_certificate: bool + _bearer_token: Optional[str] + _closed: bool + + def __init__(self, url: str, verify_certificate: bool = True, bearer_token: Optional[str] = None) -> None: + self.url = url + + self._ws = None + self._verify_certificate = verify_certificate + self._bearer_token = bearer_token + self._closed = False + + async def connect(self) -> None: + try: + # set up connection arguments for SSL and bearer token, if required + connection_kwargs: Dict[str, Any] = {} + if self.url.startswith("wss://") and not self._verify_certificate: + connection_kwargs["ssl"] = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + connection_kwargs["ssl"].check_hostname = False + connection_kwargs["ssl"].verify_mode = ssl.CERT_NONE + + if self._bearer_token: + connection_kwargs["additional_headers"] = { + "Authorization": f"Bearer {self._bearer_token}" + } + + self.ws = await ws_connect(uri=self.url, **connection_kwargs) + except (EOFError, OSError, websockets.WebSocketException) as e: + self._closed = True + message = f"Could not connect due to: {e}" + logger.error(message) + raise MediumCouldNotConnectError(message) + + async def is_connected(self) -> bool: + return self.ws is not None and not self._closed + + async def messages(self) -> AsyncGenerator[UnparsedS2Message, None]: + try: + async for message in self.ws: + yield message + except websockets.WebSocketException as e: + self._closed = True + raise MediumClosedConnectionError(f'Could not receive more messages on websocket connection {self.url}') from e + + async def send(self, message: str) -> None: + try: + await self.ws.send(message) + except websockets.WebSocketException as e: + self._closed = True + raise MediumClosedConnectionError(f'Could not send message {message}') from e + + async def close(self) -> None: + await self.ws.close() + await self.ws.wait_closed() \ No newline at end of file diff --git a/src/s2python/connection/s2_async_connection.py b/src/s2python/connection/s2_async_connection.py new file mode 100644 index 0000000..f8c9ea6 --- /dev/null +++ b/src/s2python/connection/s2_async_connection.py @@ -0,0 +1,538 @@ +from s2python.connection.medium.s2_medium import S2Medium, MediumClosedConnectionError + +import asyncio +import json +import logging +import time +import threading +import uuid +from dataclasses import dataclass +from typing import Any, Coroutine, Optional, List, Type, Dict, Callable, Awaitable, Union + + + +from s2python.common import ( + ReceptionStatusValues, + ReceptionStatus, + Handshake, + EnergyManagementRole, + Role, + HandshakeResponse, + ResourceManagerDetails, + Duration, + Currency, + SelectControlType, +) +from s2python.generated.gen_s2 import CommodityQuantity +from s2python.reception_status_awaiter import ReceptionStatusAwaiter +from s2python.s2_control_type import S2ControlType +from s2python.s2_parser import S2Parser +from s2python.s2_validation_error import S2ValidationError +from s2python.message import S2Message, S2MessageWithID +from s2python.version import S2_VERSION + +logger = logging.getLogger("s2python") + + + +class CouldNotReceiveStatusReceptionError(Exception): + ... + +@dataclass +class AssetDetails: # pylint: disable=too-many-instance-attributes + resource_id: uuid.UUID + + provides_forecast: bool + provides_power_measurements: List[CommodityQuantity] + + instruction_processing_delay: Duration + roles: List[Role] + currency: Optional[Currency] = None + + name: Optional[str] = None + manufacturer: Optional[str] = None + model: Optional[str] = None + firmware_version: Optional[str] = None + serial_number: Optional[str] = None + + def to_resource_manager_details( + self, control_types: List[S2ControlType] + ) -> ResourceManagerDetails: + return ResourceManagerDetails( + available_control_types=[ + control_type.get_protocol_control_type() + for control_type in control_types + ], + currency=self.currency, + firmware_version=self.firmware_version, + instruction_processing_delay=self.instruction_processing_delay, + manufacturer=self.manufacturer, + message_id=uuid.uuid4(), + model=self.model, + name=self.name, + provides_forecast=self.provides_forecast, + provides_power_measurement_types=self.provides_power_measurements, + resource_id=self.resource_id, + roles=self.roles, + serial_number=self.serial_number, + ) + + +S2MessageHandler = Union[ + Callable[["S2Connection", S2Message, Callable[[], None]], None], + Callable[["S2Connection", S2Message, Awaitable[None]], Awaitable[None]], +] + + +class SendOkay: + status_is_send: threading.Event + connection: "S2Connection" + subject_message_id: uuid.UUID + + def __init__(self, connection: "S2Connection", subject_message_id: uuid.UUID): + self.status_is_send = threading.Event() + self.connection = connection + self.subject_message_id = subject_message_id + + async def run_async(self) -> None: + self.status_is_send.set() + + await self.connection._respond_with_reception_status( # pylint: disable=protected-access + subject_message_id=self.subject_message_id, + status=ReceptionStatusValues.OK, + diagnostic_label="Processed okay.", + ) + + def run_sync(self) -> None: + self.status_is_send.set() + + self.connection.respond_with_reception_status_sync( + subject_message_id=self.subject_message_id, + status=ReceptionStatusValues.OK, + diagnostic_label="Processed okay.", + ) + + async def ensure_send_async(self, type_msg: Type[S2Message]) -> None: + if not self.status_is_send.is_set(): + logger.warning( + "Handler for message %s %s did not call send_okay / function to send the ReceptionStatus. " + "Sending it now.", + type_msg, + self.subject_message_id, + ) + await self.run_async() + + def ensure_send_sync(self, type_msg: Type[S2Message]) -> None: + if not self.status_is_send.is_set(): + logger.warning( + "Handler for message %s %s did not call send_okay / function to send the ReceptionStatus. " + "Sending it now.", + type_msg, + self.subject_message_id, + ) + self.run_sync() + + +class MessageHandlers: + handlers: Dict[Type[S2Message], S2MessageHandler] + + def __init__(self) -> None: + self.handlers = {} + + async def handle_message(self, connection: "S2Connection", msg: S2Message) -> None: + """Handle the S2 message using the registered handler. + + :param connection: The S2 conncetion the `msg` is received from. + :param msg: The S2 message + """ + handler = self.handlers.get(type(msg)) + if handler is not None: + send_okay = SendOkay(connection, msg.message_id) # type: ignore[attr-defined, union-attr] + + try: + if asyncio.iscoroutinefunction(handler): + await handler(connection, msg, send_okay.run_async()) # type: ignore[arg-type] + await send_okay.ensure_send_async(type(msg)) + else: + + def do_message() -> None: + handler(connection, msg, send_okay.run_sync) # type: ignore[arg-type] + send_okay.ensure_send_sync(type(msg)) + + eventloop = asyncio.get_event_loop() + await eventloop.run_in_executor(executor=None, func=do_message) + except Exception: + if not send_okay.status_is_send.is_set(): + await connection._respond_with_reception_status( # pylint: disable=protected-access + subject_message_id=msg.message_id, # type: ignore[attr-defined, union-attr] + status=ReceptionStatusValues.PERMANENT_ERROR, + diagnostic_label=f"While processing message {msg.message_id} " # type: ignore[attr-defined, union-attr] # pylint: disable=line-too-long + f"an unrecoverable error occurred.", + ) + raise + else: + logger.warning( + "Received a message of type %s but no handler is registered. Ignoring the message.", + type(msg), + ) + + def register_handler( + self, msg_type: Type[S2Message], handler: S2MessageHandler + ) -> None: + """Register a coroutine function or a normal function as the handler for a specific S2 message type. + + :param msg_type: The S2 message type to attach the handler to. + :param handler: The function (asynchronuous or normal) which should handle the S2 message. + """ + self.handlers[msg_type] = handler + + +class S2AsyncRM: + connection: 'S2AsyncConnection' + + def __init__(self): + self.connection._handlers.register_handler( + SelectControlType, self._handle_select_control_type + ) + self.connection._handlers.register_handler(Handshake, self._handle_handshake) + self.connection._handlers.register_handler(HandshakeResponse, self._handle_handshake_response) + + async def _connect_as_rm(self) -> None: + await self.connection.send_msg_and_await_reception_status( + Handshake( + message_id=uuid.uuid4(), + role=self.role, + supported_protocol_versions=[S2_VERSION], + ) + ) + logger.debug( + "Send handshake to CEM. Expecting Handshake and HandshakeResponse from CEM." + ) + + async def _handle_handshake( + self, connection: "S2AsyncConnection", message: S2Message, send_okay: Awaitable[None] + ) -> None: + if not isinstance(message, Handshake): + logger.error( + "Handler for Handshake received a message of the wrong type: %s", + type(message), + ) + return + + logger.debug( + "%s supports S2 protocol versions: %s", + message.role, + message.supported_protocol_versions, + ) + await send_okay + + async def _handle_handshake_response( + self, connection: "S2AsyncConnection", message: S2Message, send_okay: Awaitable[None] + ) -> None: + if not isinstance(message, HandshakeResponse): + logger.error( + "Handler for HandshakeResponse received a message of the wrong type: %s", + type(message), + ) + return + + logger.debug("Received HandshakeResponse %s", message.to_json()) + + logger.debug( + "CEM selected to use version %s", message.selected_protocol_version + ) + await send_okay + logger.debug("Handshake complete. Sending first ResourceManagerDetails.") + + await connection.send_msg_and_await_reception_status( + self.asset_details.to_resource_manager_details(self.control_types) + ) + + async def _handle_select_control_type( + self, connection: "S2AsyncConnection", message: S2Message, send_okay: Awaitable[None] + ) -> None: + if not isinstance(message, SelectControlType): + logger.error( + "Handler for SelectControlType received a message of the wrong type: %s", + type(message), + ) + return + + await send_okay + + logger.debug( + "CEM selected control type %s. Activating control type.", + message.control_type, + ) + + control_types_by_protocol_name = { + c.get_protocol_control_type(): c for c in self.control_types + } + selected_control_type: Optional[S2ControlType] = ( + control_types_by_protocol_name.get(message.control_type) + ) + + if self._current_control_type is not None: + await self._eventloop.run_in_executor( + None, self._current_control_type.deactivate, self + ) + + self._current_control_type = selected_control_type + + if self._current_control_type is not None: + await self._eventloop.run_in_executor( + None, self._current_control_type.activate, self + ) + self._current_control_type.register_handlers(self._handlers) + + +class S2AsyncConnection: # pylint: disable=too-many-instance-attributes + url: str + reconnect: bool + reception_status_awaiter: ReceptionStatusAwaiter + medium: S2Medium + s2_parser: S2Parser + control_types: List[S2ControlType] + role: EnergyManagementRole + asset_details: AssetDetails + + _handlers: MessageHandlers + _current_control_type: Optional[S2ControlType] + _received_messages: asyncio.Queue + + _eventloop: asyncio.AbstractEventLoop + _main_task: Optional[asyncio.Task] + _stop_event: asyncio.Event + """Stop the S2 connection permanently.""" + _restart_connection_event: asyncio.Event + """Stop the S2 connection but restart if configured.""" + _verify_certificate: bool + _bearer_token: Optional[str] + + def __init__( # pylint: disable=too-many-arguments + self, + url: str, + role: EnergyManagementRole, + control_types: List[S2ControlType], + asset_details: AssetDetails, + medium: S2Medium, + reconnect: bool = False, + verify_certificate: bool = True, + bearer_token: Optional[str] = None, + eventloop: Optional[asyncio.AbstractEventLoop] = None, + ) -> None: + self.url = url + self.reconnect = reconnect + self.reception_status_awaiter = ReceptionStatusAwaiter() + self.medium = medium + self.s2_parser = S2Parser() + + self._handlers = MessageHandlers() + self._current_control_type = None + + self._eventloop = eventloop if eventloop is not None else asyncio.get_event_loop() + + self.control_types = control_types + self.role = role + self.asset_details = asset_details + self._verify_certificate = verify_certificate + + self._main_task = None + self._bearer_token = bearer_token + + async def start_as_rm(self) -> None: + """Start this connection as a S2 resource manager and connect to a S2 CEM. + + This method will return until the connection is stopped. + """ + logger.debug('Starting S2 connection as RM.') + + self._main_task = self._eventloop.create_task(self._run_as(self._connect_as_rm())) + + async def stop(self) -> None: + """Stop the S2 connection gracefully and wait till it stops. + + Note: Not thread-safe. Must be run from the same event loop as `start_as_rm` runs in. + """ + logger.info("Will stop the S2 connection.") + self._stop_event.set() + if self._main_task is not None: + await self._main_task + + async def _run_as(self, role_task: Coroutine[None, None, None]) -> None: + logger.debug("Connecting as S2 resource manager.") + + self._stop_event = asyncio.Event() + + first_run = True + + while (first_run or self.reconnect) and not self._stop_event.is_set(): + if not first_run: + time.sleep(1) + first_run = False + self._restart_connection_event = asyncio.Event() + await self._connect_and_run(role_task) + + logger.debug("Finished S2 connection.") + + async def _wait_till_stop(self) -> None: + await self._stop_event.wait() + + async def _wait_till_connection_restart(self) -> None: + await self._restart_connection_event.wait() + + async def _connect_and_run(self, role_task: Coroutine[None, None, None]) -> None: + self._received_messages = asyncio.Queue() + await self.medium.connect() + if self.medium.is_connected(): + background_tasks = [ + self._eventloop.create_task(self._receive_messages()), + self._eventloop.create_task(self._wait_till_stop()), + self._eventloop.create_task(self._handle_received_messages()), + self._eventloop.create_task(self._wait_till_connection_restart()), + ] + + (done, pending) = await asyncio.wait( + background_tasks, return_when=asyncio.FIRST_COMPLETED + ) + if self._current_control_type: + self._current_control_type.deactivate(self) + self._current_control_type = None + + for task in done: + try: + await task + except asyncio.CancelledError: + pass + except MediumClosedConnectionError: + logger.info("The other party closed the websocket connection.") + except Exception: + logger.exception("An error occurred in the S2 connection. Terminating current connection.") + + for task in pending: + try: + task.cancel() + await task + except (asyncio.CancelledError, Exception): + pass + + await self.medium.close() + + async def _handle_received_messages(self) -> None: + while True: + msg = await self._received_messages.get() + await self._handlers.handle_message(self, msg) + + async def _receive_messages(self) -> None: + """Receives all incoming messages in the form of a generator. + + Will also receive the ReceptionStatus messages but instead of yielding these messages, they are routed + to any calls of `send_msg_and_await_reception_status`. + """ + if self.medium is None: + raise RuntimeError( + "Cannot receive messages if websocket connection is not yet established." + ) + + logger.info("S2 connection has started to receive messages.") + + async for message in await self.medium.messages(): + try: + s2_msg: S2Message = self.s2_parser.parse_as_any_message(message) + except json.JSONDecodeError: + await self.send_and_forget( + ReceptionStatus( + subject_message_id=uuid.UUID("00000000-0000-0000-0000-000000000000"), + status=ReceptionStatusValues.INVALID_DATA, + diagnostic_label="Not valid json.", + ) + ) + except S2ValidationError as e: + json_msg = json.loads(message) + message_id = json_msg.get("message_id") + if message_id: + await self.respond_with_reception_status( + subject_message_id=message_id, + status=ReceptionStatusValues.INVALID_MESSAGE, + diagnostic_label=str(e), + ) + else: + await self.respond_with_reception_status( + subject_message_id=uuid.UUID("00000000-0000-0000-0000-000000000000"), + status=ReceptionStatusValues.INVALID_DATA, + diagnostic_label="Message appears valid json but could not find a message_id field.", + ) + else: + logger.debug("Received message %s", s2_msg.to_json()) + + if isinstance(s2_msg, ReceptionStatus): + logger.debug( + "Message is a reception status for %s so registering in cache.", + s2_msg.subject_message_id, + ) + await self.reception_status_awaiter.receive_reception_status(s2_msg) + else: + await self._received_messages.put(s2_msg) + + async def send_and_forget(self, s2_msg: S2Message) -> None: + if self.medium is None: + raise RuntimeError( + "Cannot send messages if the S2 medium is not yet established." + ) + + json_msg = s2_msg.to_json() + logger.debug("Sending message %s", json_msg) + try: + await self.medium.send(json_msg) + except MediumClosedConnectionError as e: + logger.error("Unable to send message %s due to %s", s2_msg, str(e)) + self._restart_connection_event.set() + + async def respond_with_reception_status( + self, subject_message_id: uuid.UUID, status: ReceptionStatusValues, diagnostic_label: str + ) -> None: + logger.debug( + "Responding to message %s with status %s", subject_message_id, status + ) + await self.send_and_forget( + ReceptionStatus( + subject_message_id=subject_message_id, + status=status, + diagnostic_label=diagnostic_label, + ) + ) + + async def send_msg_and_await_reception_status( + self, + s2_msg: S2MessageWithID, + timeout_reception_status: float = 5.0, + raise_on_error: bool = True, + ) -> ReceptionStatus: + await self.send_and_forget(s2_msg) + logger.debug( + "Waiting for ReceptionStatus for %s %s seconds", + s2_msg.message_id, + timeout_reception_status, + ) + try: + reception_status_task = self._eventloop.create_task(self.reception_status_awaiter.wait_for_reception_status( + s2_msg.message_id, timeout_reception_status + )) + restart_event_task = self._eventloop.create_task(self._restart_connection_event.wait()) + + (done, pending) = await asyncio.wait([reception_status_task, restart_event_task], return_when=asyncio.FIRST_COMPLETED) + + if reception_status_task in done: + reception_status = await reception_status_task + else: + raise CouldNotReceiveStatusReceptionError(f"Connection restarted while waiting for ReceptionStatus for message {s2_msg.message_id}") + #TODO Still need to cancel pending tasks? + except TimeoutError: + logger.error("Did not receive a reception status on time for %s",s2_msg.message_id) + self._restart_connection_event.set() + raise + + if reception_status.status != ReceptionStatusValues.OK and raise_on_error: + raise RuntimeError(f"ReceptionStatus was not OK but rather {reception_status.status}") + + return reception_status diff --git a/src/s2python/connection/s2_sync_connection.py b/src/s2python/connection/s2_sync_connection.py new file mode 100644 index 0000000..38579c0 --- /dev/null +++ b/src/s2python/connection/s2_sync_connection.py @@ -0,0 +1,397 @@ +from s2python.common import ReceptionStatus +from s2python.message import S2MessageWithID + + +class S2Connection: # pylint: disable=too-many-instance-attributes + url: str + reconnect: bool + reception_status_awaiter: ReceptionStatusAwaiter + ws: Optional[WSConnection] + s2_parser: S2Parser + control_types: List[S2ControlType] + role: EnergyManagementRole + asset_details: AssetDetails + + _thread: threading.Thread + + _handlers: MessageHandlers + _current_control_type: Optional[S2ControlType] + _received_messages: asyncio.Queue + + _eventloop: asyncio.AbstractEventLoop + _stop_event: asyncio.Event + _restart_connection_event: asyncio.Event + _verify_certificate: bool + _bearer_token: Optional[str] + + def __init__( # pylint: disable=too-many-arguments + self, + url: str, + role: EnergyManagementRole, + control_types: List[S2ControlType], + asset_details: AssetDetails, + reconnect: bool = False, + verify_certificate: bool = True, + bearer_token: Optional[str] = None, + ) -> None: + self.url = url + self.reconnect = reconnect + self.reception_status_awaiter = ReceptionStatusAwaiter() + self.ws = None + self.s2_parser = S2Parser() + + self._handlers = MessageHandlers() + self._current_control_type = None + + self._eventloop = asyncio.new_event_loop() + + self.control_types = control_types + self.role = role + self.asset_details = asset_details + self._verify_certificate = verify_certificate + + self._handlers.register_handler( + SelectControlType, self._handle_select_control_type_as_rm + ) + self._handlers.register_handler(Handshake, self._handle_handshake) + self._handlers.register_handler(HandshakeResponse, self._handle_handshake_response_as_rm) + self._bearer_token = bearer_token + + def start_as_rm(self) -> None: + self._run_eventloop(self._run_as_rm()) + + def _run_eventloop(self, main_task: Awaitable[None]) -> None: + self._thread = threading.current_thread() + logger.debug("Starting eventloop") + try: + self._eventloop.run_until_complete(main_task) + except asyncio.CancelledError: + pass + logger.debug("S2 connection thread has stopped.") + + def stop(self) -> None: + """Stops the S2 connection. + + Note: Ensure this method is called from a different thread than the thread running the S2 connection. + Otherwise it will block waiting on the coroutine _do_stop to terminate successfully but it can't run + the coroutine. A `RuntimeError` will be raised to prevent the indefinite block. + """ + if threading.current_thread() == self._thread: + raise RuntimeError( + "Do not call stop from the thread running the S2 connection. This results in an infinite block!" + ) + if self._eventloop.is_running(): + asyncio.run_coroutine_threadsafe(self._do_stop(), self._eventloop).result() + self._thread.join() + logger.info("Stopped the S2 connection.") + + async def _do_stop(self) -> None: + logger.info("Will stop the S2 connection.") + self._stop_event.set() + + async def _run_as_rm(self) -> None: + logger.debug("Connecting as S2 resource manager.") + + self._stop_event = asyncio.Event() + + first_run = True + + while (first_run or self.reconnect) and not self._stop_event.is_set(): + first_run = False + self._restart_connection_event = asyncio.Event() + await self._connect_and_run() + time.sleep(1) + + logger.debug("Finished S2 connection eventloop.") + + async def _connect_and_run(self) -> None: + self._received_messages = asyncio.Queue() + await self._connect_ws() + if self.ws: + + async def wait_till_stop() -> None: + await self._stop_event.wait() + + async def wait_till_connection_restart() -> None: + await self._restart_connection_event.wait() + + background_tasks = [ + self._eventloop.create_task(self._receive_messages()), + self._eventloop.create_task(wait_till_stop()), + self._eventloop.create_task(self._connect_as_rm()), + self._eventloop.create_task(wait_till_connection_restart()), + ] + + (done, pending) = await asyncio.wait( + background_tasks, return_when=asyncio.FIRST_COMPLETED + ) + if self._current_control_type: + self._current_control_type.deactivate(self) + self._current_control_type = None + + for task in done: + try: + await task + except asyncio.CancelledError: + pass + except ( + websockets.ConnectionClosedError, + websockets.ConnectionClosedOK, + ): + logger.info("The other party closed the websocket connection.") + + for task in pending: + try: + task.cancel() + await task + except asyncio.CancelledError: + pass + + await self.ws.close() + await self.ws.wait_closed() + + async def _connect_ws(self) -> None: + try: + # set up connection arguments for SSL and bearer token, if required + connection_kwargs: Dict[str, Any] = {} + if self.url.startswith("wss://") and not self._verify_certificate: + connection_kwargs["ssl"] = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + connection_kwargs["ssl"].check_hostname = False + connection_kwargs["ssl"].verify_mode = ssl.CERT_NONE + + if self._bearer_token: + connection_kwargs["additional_headers"] = { + "Authorization": f"Bearer {self._bearer_token}" + } + + self.ws = await ws_connect(uri=self.url, **connection_kwargs) + except (EOFError, OSError) as e: + logger.info("Could not connect due to: %s", str(e)) + + async def _connect_as_rm(self) -> None: + await self._send_msg_and_await_reception_status_async( + Handshake( + message_id=uuid.uuid4(), + role=self.role, + supported_protocol_versions=[S2_VERSION], + ) + ) + logger.debug( + "Send handshake to CEM. Expecting Handshake and HandshakeResponse from CEM." + ) + + await self._handle_received_messages() + + async def _handle_handshake( + self, _: "S2Connection", message: S2Message, send_okay: Awaitable[None] + ) -> None: + if not isinstance(message, Handshake): + logger.error( + "Handler for Handshake received a message of the wrong type: %s", + type(message), + ) + return + + logger.debug( + "%s supports S2 protocol versions: %s", + message.role, + message.supported_protocol_versions, + ) + await send_okay + + async def _handle_handshake_response_as_rm( + self, _: "S2Connection", message: S2Message, send_okay: Awaitable[None] + ) -> None: + if not isinstance(message, HandshakeResponse): + logger.error( + "Handler for HandshakeResponse received a message of the wrong type: %s", + type(message), + ) + return + + logger.debug("Received HandshakeResponse %s", message.to_json()) + + logger.debug( + "CEM selected to use version %s", message.selected_protocol_version + ) + await send_okay + logger.debug("Handshake complete. Sending first ResourceManagerDetails.") + + await self._send_msg_and_await_reception_status_async( + self.asset_details.to_resource_manager_details(self.control_types) + ) + + async def _handle_select_control_type_as_rm( + self, _: "S2Connection", message: S2Message, send_okay: Awaitable[None] + ) -> None: + if not isinstance(message, SelectControlType): + logger.error( + "Handler for SelectControlType received a message of the wrong type: %s", + type(message), + ) + return + + await send_okay + + logger.debug( + "CEM selected control type %s. Activating control type.", + message.control_type, + ) + + control_types_by_protocol_name = { + c.get_protocol_control_type(): c for c in self.control_types + } + selected_control_type: Optional[S2ControlType] = ( + control_types_by_protocol_name.get(message.control_type) + ) + + if self._current_control_type is not None: + await self._eventloop.run_in_executor( + None, self._current_control_type.deactivate, self + ) + + self._current_control_type = selected_control_type + + if self._current_control_type is not None: + await self._eventloop.run_in_executor( + None, self._current_control_type.activate, self + ) + self._current_control_type.register_handlers(self._handlers) + + async def _receive_messages(self) -> None: + """Receives all incoming messages in the form of a generator. + + Will also receive the ReceptionStatus messages but instead of yielding these messages, they are routed + to any calls of `send_msg_and_await_reception_status`. + """ + if self.ws is None: + raise RuntimeError( + "Cannot receive messages if websocket connection is not yet established." + ) + + logger.info("S2 connection has started to receive messages.") + + async for message in self.ws: + try: + s2_msg: S2Message = self.s2_parser.parse_as_any_message(message) + except json.JSONDecodeError: + await self._send_and_forget( + ReceptionStatus( + subject_message_id=uuid.UUID("00000000-0000-0000-0000-000000000000"), + status=ReceptionStatusValues.INVALID_DATA, + diagnostic_label="Not valid json.", + ) + ) + except S2ValidationError as e: + json_msg = json.loads(message) + message_id = json_msg.get("message_id") + if message_id: + await self._respond_with_reception_status( + subject_message_id=message_id, + status=ReceptionStatusValues.INVALID_MESSAGE, + diagnostic_label=str(e), + ) + else: + await self._respond_with_reception_status( + subject_message_id=uuid.UUID("00000000-0000-0000-0000-000000000000"), + status=ReceptionStatusValues.INVALID_DATA, + diagnostic_label="Message appears valid json but could not find a message_id field.", + ) + else: + logger.debug("Received message %s", s2_msg.to_json()) + + if isinstance(s2_msg, ReceptionStatus): + logger.debug( + "Message is a reception status for %s so registering in cache.", + s2_msg.subject_message_id, + ) + await self.reception_status_awaiter.receive_reception_status(s2_msg) + else: + await self._received_messages.put(s2_msg) + + async def _send_and_forget(self, s2_msg: S2Message) -> None: + if self.ws is None: + raise RuntimeError( + "Cannot send messages if websocket connection is not yet established." + ) + + json_msg = s2_msg.to_json() + logger.debug("Sending message %s", json_msg) + try: + await self.ws.send(json_msg) + except websockets.ConnectionClosedError as e: + logger.error("Unable to send message %s due to %s", s2_msg, str(e)) + self._restart_connection_event.set() + + async def _respond_with_reception_status( + self, subject_message_id: uuid.UUID, status: ReceptionStatusValues, diagnostic_label: str + ) -> None: + logger.debug( + "Responding to message %s with status %s", subject_message_id, status + ) + await self._send_and_forget( + ReceptionStatus( + subject_message_id=subject_message_id, + status=status, + diagnostic_label=diagnostic_label, + ) + ) + + def respond_with_reception_status_sync( + self, subject_message_id: uuid.UUID, status: ReceptionStatusValues, diagnostic_label: str + ) -> None: + asyncio.run_coroutine_threadsafe( + self._respond_with_reception_status( + subject_message_id, status, diagnostic_label + ), + self._eventloop, + ).result() + + async def _send_msg_and_await_reception_status_async( + self, + s2_msg: S2MessageWithID, + timeout_reception_status: float = 5.0, + raise_on_error: bool = True, + ) -> ReceptionStatus: + await self._send_and_forget(s2_msg) + logger.debug( + "Waiting for ReceptionStatus for %s %s seconds", + s2_msg.message_id, # type: ignore[attr-defined, union-attr] + timeout_reception_status, + ) + try: + reception_status = await self.reception_status_awaiter.wait_for_reception_status( + s2_msg.message_id, timeout_reception_status # type: ignore[attr-defined, union-attr] + ) + except TimeoutError: + logger.error( + "Did not receive a reception status on time for %s", + s2_msg.message_id, # type: ignore[attr-defined, union-attr] + ) + self._stop_event.set() + raise + + if reception_status.status != ReceptionStatusValues.OK and raise_on_error: + raise RuntimeError( + f"ReceptionStatus was not OK but rather {reception_status.status}" + ) + + return reception_status + + def send_msg_and_await_reception_status_sync( + self, + s2_msg: S2MessageWithID, + timeout_reception_status: float = 5.0, + raise_on_error: bool = True, + ) -> ReceptionStatus: + return asyncio.run_coroutine_threadsafe( + self._send_msg_and_await_reception_status_async( + s2_msg, timeout_reception_status, raise_on_error + ), + self._eventloop, + ).result() + + async def _handle_received_messages(self) -> None: + while True: + msg = await self._received_messages.get() + await self._handlers.handle_message(self, msg) \ No newline at end of file diff --git a/src/s2python/s2_parser.py b/src/s2python/s2_parser.py index ca17306..0cddf18 100644 --- a/src/s2python/s2_parser.py +++ b/src/s2python/s2_parser.py @@ -38,6 +38,9 @@ M = TypeVar("M", bound=S2MessageComponent) +UnparsedS2Message = Union[dict[Any, Any], str, bytes] + + # May be generated with development_utilities/generate_s2_message_type_to_class.py TYPE_TO_MESSAGE_CLASS: Dict[str, Type[S2Message]] = { "FRBC.ActuatorStatus": FRBCActuatorStatus, @@ -67,13 +70,13 @@ class S2Parser: @staticmethod - def _parse_json_if_required(unparsed_message: Union[dict[Any, Any], str, bytes]) -> dict: + def _parse_json_if_required(unparsed_message: UnparsedS2Message) -> dict: if isinstance(unparsed_message, (str, bytes)): return json.loads(unparsed_message) return unparsed_message @staticmethod - def parse_as_any_message(unparsed_message: Union[dict[Any, Any], str, bytes]) -> S2Message: + def parse_as_any_message(unparsed_message: UnparsedS2Message) -> S2Message: """Parse the message as any S2 python message regardless of message type. :param unparsed_message: The message as a JSON-formatted string or as a json-parsed dictionary. @@ -94,7 +97,7 @@ def parse_as_any_message(unparsed_message: Union[dict[Any, Any], str, bytes]) -> @staticmethod def parse_as_message( - unparsed_message: Union[dict[Any, Any], str, bytes], as_message: Type[M] + unparsed_message: UnparsedS2Message, as_message: Type[M] ) -> M: """Parse the message to a specific S2 python message. @@ -108,7 +111,7 @@ def parse_as_message( @staticmethod def parse_message_type( - unparsed_message: Union[dict[Any, Any], str, bytes], + unparsed_message: UnparsedS2Message, ) -> Optional[S2MessageType]: """Parse only the message type from the unparsed message. From 7af8771ac34bf1179748b1b7ca6a78a219d95323 Mon Sep 17 00:00:00 2001 From: Sebastiaan la Fleur Date: Fri, 31 Oct 2025 14:51:37 +0100 Subject: [PATCH 2/3] 139: Finish up first draft before executing it. --- .../{example_frbc_rm.py => async_frbc_rm.py} | 55 +- src/s2python/connection/__init__.py | 4 + src/s2python/connection/asset_details.py | 52 ++ src/s2python/connection/async_/__init__.py | 13 + src/s2python/connection/async_/connection.py | 241 ++++++++ .../async_/control_type/__init__.py | 0 .../async_/control_type/class_based.py | 241 ++++++++ .../connection/async_/medium/s2_medium.py | 62 ++ .../{ => async_}/medium/websocket.py | 14 +- .../connection/async_/message_handlers.py | 105 ++++ src/s2python/connection/connection_events.py | 12 + src/s2python/connection/control_type.py | 11 + src/s2python/connection/medium/s2_medium.py | 36 -- .../connection/s2_async_connection.py | 538 ------------------ src/s2python/connection/s2_sync_connection.py | 397 ------------- src/s2python/connection/sync/connection.py | 127 +++++ .../sync/control_type/control_types.py | 0 src/s2python/connection/types.py | 8 + src/s2python/s2_connection.py | 40 +- src/s2python/s2_control_type.py | 103 ---- 20 files changed, 915 insertions(+), 1144 deletions(-) rename examples/{example_frbc_rm.py => async_frbc_rm.py} (79%) create mode 100644 src/s2python/connection/__init__.py create mode 100644 src/s2python/connection/asset_details.py create mode 100644 src/s2python/connection/async_/__init__.py create mode 100644 src/s2python/connection/async_/connection.py create mode 100644 src/s2python/connection/async_/control_type/__init__.py create mode 100644 src/s2python/connection/async_/control_type/class_based.py create mode 100644 src/s2python/connection/async_/medium/s2_medium.py rename src/s2python/connection/{ => async_}/medium/websocket.py (89%) create mode 100644 src/s2python/connection/async_/message_handlers.py create mode 100644 src/s2python/connection/connection_events.py create mode 100644 src/s2python/connection/control_type.py delete mode 100644 src/s2python/connection/medium/s2_medium.py delete mode 100644 src/s2python/connection/s2_async_connection.py delete mode 100644 src/s2python/connection/s2_sync_connection.py create mode 100644 src/s2python/connection/sync/connection.py create mode 100644 src/s2python/connection/sync/control_type/control_types.py create mode 100644 src/s2python/connection/types.py diff --git a/examples/example_frbc_rm.py b/examples/async_frbc_rm.py similarity index 79% rename from examples/example_frbc_rm.py rename to examples/async_frbc_rm.py index d69473d..5ddf984 100644 --- a/examples/example_frbc_rm.py +++ b/examples/async_frbc_rm.py @@ -1,4 +1,5 @@ import argparse +import asyncio from functools import partial import logging import sys @@ -8,7 +9,6 @@ from typing import Callable from s2python.common import ( - EnergyManagementRole, Duration, Role, RoleType, @@ -30,8 +30,10 @@ FRBCStorageStatus, FRBCActuatorStatus, ) -from s2python.s2_connection import S2Connection, AssetDetails -from s2python.s2_control_type import FRBCControlType, NoControlControlType +from s2python.connection import AssetDetails +from s2python.connection.async_ import S2AsyncConnection +from s2python.connection.async_.medium.websocket import WebsocketClientMedium +from s2python.connection.async_.control_type.class_based import FRBCControlType, NoControlControlType, ResourceManagerHandler from s2python.message import S2Message logger = logging.getLogger("s2python") @@ -40,22 +42,22 @@ class MyFRBCControlType(FRBCControlType): - def handle_instruction( - self, conn: S2Connection, msg: S2Message, send_okay: Callable[[], None] + async def handle_instruction( + self, connection: S2AsyncConnection, msg: S2Message, send_okay: Callable[[], None] ) -> None: if not isinstance(msg, FRBCInstruction): raise RuntimeError( f"Expected an FRBCInstruction but received a message of type {type(msg)}." ) - print(f"I have received the message {msg} from {conn}") + print(f"I have received the message {msg} from {connection}") - def activate(self, conn: S2Connection) -> None: + async def activate(self, connection: S2AsyncConnection) -> None: print("The control type FRBC is now activated.") print("Time to send a FRBC SystemDescription") actuator_id = uuid.uuid4() operation_mode_id = uuid.uuid4() - conn.send_msg_and_await_reception_status_sync( + await connection.send_msg_and_await_reception_status( FRBCSystemDescription( message_id=uuid.uuid4(), valid_from=datetime.datetime.now(tz=datetime.timezone.utc), @@ -103,7 +105,7 @@ def activate(self, conn: S2Connection) -> None: ) print("Also send the target profile") - conn.send_msg_and_await_reception_status_sync( + await connection.send_msg_and_await_reception_status( FRBCFillLevelTargetProfile( message_id=uuid.uuid4(), start_time=datetime.datetime.now(tz=datetime.timezone.utc), @@ -121,12 +123,12 @@ def activate(self, conn: S2Connection) -> None: ) print("Also send the storage status.") - conn.send_msg_and_await_reception_status_sync( + await connection.send_msg_and_await_reception_status( FRBCStorageStatus(message_id=uuid.uuid4(), present_fill_level=10.0) ) print("Also send the actuator status.") - conn.send_msg_and_await_reception_status_sync( + await connection.send_msg_and_await_reception_status( FRBCActuatorStatus( message_id=uuid.uuid4(), actuator_id=actuator_id, @@ -135,15 +137,15 @@ def activate(self, conn: S2Connection) -> None: ) ) - def deactivate(self, conn: S2Connection) -> None: + async def deactivate(self, connection: S2AsyncConnection) -> None: print("The control type FRBC is now deactivated.") class MyNoControlControlType(NoControlControlType): - def activate(self, conn: S2Connection) -> None: + async def activate(self, connection: S2AsyncConnection) -> None: print("The control type NoControl is now activated.") - def deactivate(self, conn: S2Connection) -> None: + async def deactivate(self, connection: S2AsyncConnection) -> None: print("The control type NoControl is now deactivated.") @@ -152,11 +154,9 @@ def stop(s2_connection, signal_num, _current_stack_frame): s2_connection.stop() -def start_s2_session(url, client_node_id=str(uuid.uuid4())): - s2_conn = S2Connection( - url=url, - role=EnergyManagementRole.RM, - control_types=[MyFRBCControlType(), MyNoControlControlType()], +async def start_s2_session(url, client_node_id=uuid.uuid4()): + # Configure a resource manager + rm_handler = ResourceManagerHandler( asset_details=AssetDetails( resource_id=client_node_id, name="Some asset", @@ -166,14 +166,21 @@ def start_s2_session(url, client_node_id=str(uuid.uuid4())): provides_forecast=False, provides_power_measurements=[CommodityQuantity.ELECTRIC_POWER_L1], ), - reconnect=True, - verify_certificate=False, + control_types=[MyFRBCControlType(), MyNoControlControlType()] ) + + # Setup the underlying websocket connection + ws_medium = WebsocketClientMedium(url=url, verify_certificate=False) + await ws_medium.connect() + + # Configure the S2 connection on top of the websocket connection + s2_conn = S2AsyncConnection(medium=ws_medium) + rm_handler.register_handlers(s2_conn) + await s2_conn.start() + signal.signal(signal.SIGINT, partial(stop, s2_conn)) signal.signal(signal.SIGTERM, partial(stop, s2_conn)) - s2_conn.start_as_rm() - if __name__ == "__main__": parser = argparse.ArgumentParser(description="A simple S2 reseource manager example.") @@ -185,4 +192,4 @@ def start_s2_session(url, client_node_id=str(uuid.uuid4())): ) args = parser.parse_args() - start_s2_session(args.endpoint) + asyncio.run(start_s2_session(args.endpoint)) diff --git a/src/s2python/connection/__init__.py b/src/s2python/connection/__init__.py new file mode 100644 index 0000000..ad9fbdd --- /dev/null +++ b/src/s2python/connection/__init__.py @@ -0,0 +1,4 @@ +from s2python.connection.asset_details import AssetDetails +from s2python.connection.connection_events import ConnectionStarted, ConnectionStopped, S2ConnectionEvent +from s2python.connection.control_type import RoleHandler +from s2python.connection.types import S2ConnectionEventsAndMessages diff --git a/src/s2python/connection/asset_details.py b/src/s2python/connection/asset_details.py new file mode 100644 index 0000000..880f0a3 --- /dev/null +++ b/src/s2python/connection/asset_details.py @@ -0,0 +1,52 @@ +import uuid +from dataclasses import dataclass +from typing import Optional, List + + +from s2python.common import ( + Role, + ResourceManagerDetails, + Duration, + Currency, +) +from s2python.generated.gen_s2 import CommodityQuantity +from s2python.s2_control_type import S2ControlType + +@dataclass +class AssetDetails: # pylint: disable=too-many-instance-attributes + resource_id: uuid.UUID + + provides_forecast: bool + provides_power_measurements: List[CommodityQuantity] + + instruction_processing_delay: Duration + roles: List[Role] + currency: Optional[Currency] = None + + name: Optional[str] = None + manufacturer: Optional[str] = None + model: Optional[str] = None + firmware_version: Optional[str] = None + serial_number: Optional[str] = None + + def to_resource_manager_details( + self, control_types: List[S2ControlType] + ) -> ResourceManagerDetails: + return ResourceManagerDetails( + available_control_types=[ + control_type.get_protocol_control_type() + for control_type in control_types + ], + currency=self.currency, + firmware_version=self.firmware_version, + instruction_processing_delay=self.instruction_processing_delay, + manufacturer=self.manufacturer, + message_id=uuid.uuid4(), + model=self.model, + name=self.name, + provides_forecast=self.provides_forecast, + provides_power_measurement_types=self.provides_power_measurements, + resource_id=self.resource_id, + roles=self.roles, + serial_number=self.serial_number, + ) diff --git a/src/s2python/connection/async_/__init__.py b/src/s2python/connection/async_/__init__.py new file mode 100644 index 0000000..52ca6dc --- /dev/null +++ b/src/s2python/connection/async_/__init__.py @@ -0,0 +1,13 @@ +from s2python.connection.async_.connection import S2AsyncConnection, CouldNotReceiveStatusReceptionError +from s2python.connection.async_.message_handlers import S2EventHandlerAsync +from s2python.connection.async_.medium.s2_medium import S2MediumConnection +from s2python.connection.async_.medium.websocket import WebsocketClientMedium + + +__all__ = [ + "S2AsyncConnection", + "CouldNotReceiveStatusReceptionError", + "S2EventHandlerAsync", + "S2MediumConnection", + "WebsocketClientMedium" +] diff --git a/src/s2python/connection/async_/connection.py b/src/s2python/connection/async_/connection.py new file mode 100644 index 0000000..e540c99 --- /dev/null +++ b/src/s2python/connection/async_/connection.py @@ -0,0 +1,241 @@ +from websockets import ConnectionClosed +from s2python.connection.connection_events import ConnectionStarted, ConnectionStopped +from s2python.connection.medium.s2_medium import S2MediumConnectionAsync, MediumClosedConnectionError + +import asyncio +import json +import logging +import uuid +from typing import Coroutine, Optional, List, Type, Any, Callable + +from s2python.common import ( + ReceptionStatusValues, + ReceptionStatus, +) +from s2python.connection.async_.message_handlers import MessageHandlers, S2EventHandlerAsync +from s2python.connection.types import S2ConnectionEventsAndMessages +from s2python.reception_status_awaiter import ReceptionStatusAwaiter +from s2python.s2_control_type import S2ControlType +from s2python.s2_parser import S2Parser +from s2python.s2_validation_error import S2ValidationError +from s2python.message import S2Message, S2MessageWithID +from s2python.connection.connection_events import ConnectionStarted + +logger = logging.getLogger("s2python") + + + +class CouldNotReceiveStatusReceptionError(Exception): + ... + + +class S2AsyncConnection: # pylint: disable=too-many-instance-attributes + _eventloop: asyncio.AbstractEventLoop + _main_task: Optional[asyncio.Task] + _stop_event: asyncio.Event + """Stop the S2 connection permanently.""" + _received_messages: asyncio.Queue + + _reception_status_awaiter: ReceptionStatusAwaiter + _medium: S2MediumConnectionAsync + _s2_parser: S2Parser + _handlers: MessageHandlers + + def __init__( # pylint: disable=too-many-arguments + self, + medium: S2MediumConnectionAsync, + eventloop: Optional[asyncio.AbstractEventLoop] = None, + ) -> None: + self._eventloop = eventloop if eventloop is not None else asyncio.get_event_loop() + self._main_task = None + self._stop_event = asyncio.Event() + + self._reception_status_awaiter = ReceptionStatusAwaiter() + self._medium = medium + self._s2_parser = S2Parser() + self._handlers = MessageHandlers() + + async def start(self) -> None: + """Start this connection with the given S2 role such as resource manager or CEM and connect to the other party.""" + logger.debug('Starting S2 connection as %s.',) + + self._main_task = self._eventloop.create_task(self._run()) + + async def stop(self) -> None: + """Stop the S2 connection gracefully and wait till it stops. + + Note: Not thread-safe. Must be run from the same event loop as `start_as_rm` runs in. + Does not stop the underlying medium! + """ + logger.info("Will stop the S2 connection.") + self._stop_event.set() + if self._main_task is not None: + await self._main_task + + async def _wait_till_stop(self) -> None: + await self._stop_event.wait() + + async def _run(self) -> None: + self._received_messages = asyncio.Queue() + + if not self._medium.is_connected(): + raise MediumClosedConnectionError("Cannot start the S2 connection if the underlying medium is closed.") + + background_tasks = [ + self._eventloop.create_task(self._receive_messages()), + self._eventloop.create_task(self._wait_till_stop()), + self._eventloop.create_task(self._handle_received_messages()), + ] + + await self._handlers.handle_event(self, ConnectionStarted()) + + (done, pending) = await asyncio.wait( + background_tasks, return_when=asyncio.FIRST_COMPLETED + ) + + await self._handlers.handle_event(self, ConnectionStopped()) + + for task in done: + try: + await task + except asyncio.CancelledError: + pass + except MediumClosedConnectionError: + logger.info("The other party closed the websocket connection.") + except Exception: + logger.exception("An error occurred in the S2 connection. Terminating current connection.") + + for task in pending: + try: + task.cancel() + await task + except (asyncio.CancelledError, Exception): + pass + + async def _handle_received_messages(self) -> None: + while not self._stop_event.is_set(): + msg = await self._received_messages.get() + await self._handlers.handle_event(self, msg) + + async def _receive_messages(self) -> None: + """Receives all incoming messages in the form of a generator. + + Will also receive the ReceptionStatus messages but instead of yielding these messages, they are routed + to any calls of `send_msg_and_await_reception_status`. + """ + logger.info("S2 connection has started to receive messages.") + + async for message in await self._medium.messages(): + try: + s2_msg: S2Message = self._s2_parser.parse_as_any_message(message) + except json.JSONDecodeError: + await self.send_and_forget( + ReceptionStatus( + subject_message_id=uuid.UUID("00000000-0000-0000-0000-000000000000"), + status=ReceptionStatusValues.INVALID_DATA, + diagnostic_label="Not valid json.", + ) + ) + except S2ValidationError as e: + json_msg = json.loads(message) + message_id = json_msg.get("message_id") + if message_id: + await self.respond_with_reception_status( + subject_message_id=message_id, + status=ReceptionStatusValues.INVALID_MESSAGE, + diagnostic_label=str(e), + ) + else: + await self.respond_with_reception_status( + subject_message_id=uuid.UUID("00000000-0000-0000-0000-000000000000"), + status=ReceptionStatusValues.INVALID_DATA, + diagnostic_label="Message appears valid json but could not find a message_id field.", + ) + else: + logger.debug("Received message %s", s2_msg.to_json()) + + if isinstance(s2_msg, ReceptionStatus): + logger.debug( + "Message is a reception status for %s so registering in cache.", + s2_msg.subject_message_id, + ) + await self._reception_status_awaiter.receive_reception_status(s2_msg) + else: + await self._received_messages.put(s2_msg) + + def register_handler(self, event_type: Type[S2ConnectionEventsAndMessages], handler: S2EventHandlerAsync) -> None: + """Register a handler for a specific S2 message type. + + :param event_type: The S2 connection event to register the handler for. + :param handler: The handler function (asynchronous or normal) which will handle the message. + """ + self._handlers.register_handler(event_type, handler) + + def unregister_handler(self, s2_message_type: Type[S2ConnectionEventsAndMessages]) -> None: + self._handlers.unregister_handler(s2_message_type) + + async def send_and_forget(self, s2_msg: S2Message) -> None: + json_msg = s2_msg.to_json() + logger.debug("Sending message %s", json_msg) + try: + await self._medium.send(json_msg) + except MediumClosedConnectionError: + logger.error("Unable to send message %s due to %s", s2_msg, str(e)) + raise + + async def respond_with_reception_status( + self, subject_message_id: uuid.UUID, status: ReceptionStatusValues, diagnostic_label: str + ) -> None: + logger.debug( + "Responding to message %s with status %s", subject_message_id, status + ) + await self.send_and_forget( + ReceptionStatus( + subject_message_id=subject_message_id, + status=status, + diagnostic_label=diagnostic_label, + ) + ) + + async def send_msg_and_await_reception_status( + self, + s2_msg: S2MessageWithID, + timeout_reception_status: float = 5.0, + raise_on_error: bool = True, + ) -> ReceptionStatus: + await self.send_and_forget(s2_msg) + logger.debug( + "Waiting for ReceptionStatus for %s %s seconds", + s2_msg.message_id, + timeout_reception_status, + ) + reception_status_task = self._eventloop.create_task(self._reception_status_awaiter.wait_for_reception_status( + s2_msg.message_id, timeout_reception_status + )) + stop_event_task = self._eventloop.create_task(self._wait_till_stop()) + + (done, pending) = await asyncio.wait([reception_status_task, stop_event_task], return_when=asyncio.FIRST_COMPLETED) + + for task in pending: + try: + task.cancel() + await task + except (asyncio.CancelledError, Exception): + pass + + if reception_status_task in done: + try: + reception_status = await reception_status_task + except TimeoutError: + logger.error("Did not receive a reception status on time for %s", s2_msg.message_id) + self._stop_event.set() + raise + else: + #stop_event_task in done + await stop_event_task + raise CouldNotReceiveStatusReceptionError(f"Connection stopped while waiting for ReceptionStatus for message {s2_msg.message_id}") + + if reception_status.status != ReceptionStatusValues.OK and raise_on_error: + raise RuntimeError(f"ReceptionStatus was not OK but rather {reception_status.status}") + + return reception_status diff --git a/src/s2python/connection/async_/control_type/__init__.py b/src/s2python/connection/async_/control_type/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/s2python/connection/async_/control_type/class_based.py b/src/s2python/connection/async_/control_type/class_based.py new file mode 100644 index 0000000..51776c3 --- /dev/null +++ b/src/s2python/connection/async_/control_type/class_based.py @@ -0,0 +1,241 @@ +import abc +import logging +import uuid +from typing import Coroutine, Optional, List, Any, Callable + +from s2python.connection.asset_details import AssetDetails +from s2python.common import ( + Handshake, + EnergyManagementRole, + HandshakeResponse, + SelectControlType, +) +from s2python.connection.async_.connection import S2AsyncConnection +from s2python.connection.types import S2ConnectionEvent +from s2python.version import S2_VERSION + +from s2python.connection.connection_events import ConnectionStarted + +from s2python.common import ControlType as ProtocolControlType +from s2python.frbc import FRBCInstruction +from s2python.ppbc import PPBCScheduleInstruction +from s2python.ombc import OMBCInstruction +from s2python.message import S2Message + +logger = logging.getLogger("s2python") + + +class S2ControlType(abc.ABC): + @abc.abstractmethod + def get_protocol_control_type(self) -> ProtocolControlType: ... + + @abc.abstractmethod + def register_handlers(self, connection: S2AsyncConnection) -> None: ... + + @abc.abstractmethod + async def activate(self, connection: S2AsyncConnection) -> None: ... + + @abc.abstractmethod + async def deactivate(self, connection: S2AsyncConnection) -> None: ... + + + +class ResourceManagerHandler: + asset_details: AssetDetails + _current_control_type: Optional[S2ControlType] + _control_types: List[S2ControlType] + + def __init__(self, control_types: List[S2ControlType], + asset_details: AssetDetails) -> None: + self.asset_details = asset_details + self._current_control_type = None + self._control_types = control_types + + def get_s2_role(self) -> EnergyManagementRole: + return EnergyManagementRole.RM + + def register_handlers(self, connection: S2AsyncConnection) -> None: + connection.register_handler(ConnectionStarted, self._on_connection_started) + connection.register_handler(Handshake, self._on_handshake) + connection.register_handler(HandshakeResponse, self._on_handshake_response) + connection.register_handler(SelectControlType, self._on_select_control_type) + + async def _on_connection_started(self, connection: S2AsyncConnection, _: S2ConnectionEvent, __: Optional[Coroutine[Any, Any, None]]) -> None: + await connection.send_msg_and_await_reception_status( + Handshake( + message_id=uuid.uuid4(), + role=self.get_s2_role(), + supported_protocol_versions=[S2_VERSION], + ) + ) + logger.debug( + "Send handshake to CEM. Expecting Handshake and HandshakeResponse from CEM." + ) + + async def _on_handshake( + self, _: S2AsyncConnection, event: S2ConnectionEvent, send_okay: Optional[Coroutine[Any, Any, None]] + ) -> None: + assert send_okay is not None + if not isinstance(event, Handshake): + logger.error( + "Handler for Handshake received a message of the wrong type: %s", + type(event), + ) + return + + logger.debug( + "%s supports S2 protocol versions: %s", + event.role, + event.supported_protocol_versions, + ) + await send_okay + + async def _on_handshake_response( + self, connection: S2AsyncConnection, event: S2ConnectionEvent, send_okay: Optional[Coroutine[Any, Any, None]] + ) -> None: + assert send_okay is not None + if not isinstance(event, HandshakeResponse): + logger.error( + "Handler for HandshakeResponse received a message of the wrong type: %s", + type(event), + ) + return + + logger.debug("Received HandshakeResponse %s", event.to_json()) + logger.debug( + "CEM selected to use version %s", event.selected_protocol_version + ) + await send_okay + logger.debug("Handshake complete. Sending first ResourceManagerDetails.") + + await connection.send_msg_and_await_reception_status( + self.asset_details.to_resource_manager_details(self._control_types) + ) + + async def _on_select_control_type( + self, connection: S2AsyncConnection, event: S2ConnectionEvent, send_okay: Optional[Coroutine[Any, Any, None]] + ) -> None: + assert send_okay is not None + if not isinstance(event, SelectControlType): + logger.error( + "Handler for SelectControlType received a message of the wrong type: %s", + type(event), + ) + return + + await send_okay + + logger.debug( + "CEM selected control type %s. Activating control type.", + event.control_type, + ) + + control_types_by_protocol_name = { + c.get_protocol_control_type(): c for c in self._control_types + } + selected_control_type = control_types_by_protocol_name.get(event.control_type) + + if self._current_control_type is not None: + await self._current_control_type.deactivate(connection) + + self._current_control_type = selected_control_type + + if self._current_control_type is not None: + self._current_control_type.register_handlers(connection) + await self._current_control_type.activate(connection) + + async def _on_connection_stop(self, connection: S2AsyncConnection, __: S2ConnectionEvent, ___: Optional[Coroutine[Any, Any, None]]): + if self._current_control_type: + await self._current_control_type.deactivate(connection) + self._current_control_type = None + + +class FRBCControlType(S2ControlType): + def get_protocol_control_type(self) -> ProtocolControlType: + return ProtocolControlType.FILL_RATE_BASED_CONTROL + + def register_handlers(self, connection: S2AsyncConnection) -> None: + connection.register_handler(FRBCInstruction, self.handle_instruction) + + @abc.abstractmethod + async def handle_instruction( + self, conn: S2AsyncConnection, msg: S2Message, send_okay: Callable[[], None] + ) -> None: ... + + @abc.abstractmethod + async def activate(self, connection: S2AsyncConnection) -> None: + """Overwrite with the actual dctivation logic of your Resource Manager for this particular control type.""" + + @abc.abstractmethod + async def deactivate(self, connection: S2AsyncConnection) -> None: + """Overwrite with the actual deactivation logic of your Resource Manager for this particular control type.""" + + +class PPBCControlType(S2ControlType): + def get_protocol_control_type(self) -> ProtocolControlType: + return ProtocolControlType.POWER_PROFILE_BASED_CONTROL + + def register_handlers(self, connection: S2AsyncConnection) -> None: + connection.register_handler(PPBCScheduleInstruction, self.handle_instruction) + + @abc.abstractmethod + async def handle_instruction( + self, connection: S2AsyncConnection, msg: S2Message, send_okay: Callable[[], None] + ) -> None: ... + + @abc.abstractmethod + async def activate(self, connection: S2AsyncConnection) -> None: + """Overwrite with the actual dctivation logic of your Resource Manager for this particular control type.""" + + @abc.abstractmethod + async def deactivate(self, connection: S2AsyncConnection) -> None: + """Overwrite with the actual deactivation logic of your Resource Manager for this particular control type.""" + + +class OMBCControlType(S2ControlType): + def get_protocol_control_type(self) -> ProtocolControlType: + return ProtocolControlType.OPERATION_MODE_BASED_CONTROL + + def register_handlers(self, connection: S2AsyncConnection) -> None: + connection.register_handler(OMBCInstruction, self.handle_instruction) + + @abc.abstractmethod + async def handle_instruction( + self, connection: S2AsyncConnection, msg: S2Message, send_okay: Callable[[], None] + ) -> None: ... + + @abc.abstractmethod + async def activate(self, connection: S2AsyncConnection) -> None: + """Overwrite with the actual dctivation logic of your Resource Manager for this particular control type.""" + + @abc.abstractmethod + async def deactivate(self, connection: S2AsyncConnection) -> None: + """Overwrite with the actual deactivation logic of your Resource Manager for this particular control type.""" + + +class PEBCControlType(S2ControlType): + def get_protocol_control_type(self) -> ProtocolControlType: + return ProtocolControlType.POWER_ENVELOPE_BASED_CONTROL + + def register_handlers(self, connection: S2AsyncConnection) -> None: + pass + + @abc.abstractmethod + async def activate(self, connection: S2AsyncConnection) -> None: ... + + @abc.abstractmethod + async def deactivate(self, connection: S2AsyncConnection) -> None: ... + + +class NoControlControlType(S2ControlType): + def get_protocol_control_type(self) -> ProtocolControlType: + return ProtocolControlType.NOT_CONTROLABLE + + def register_handlers(self, connection: S2AsyncConnection) -> None: + pass + + @abc.abstractmethod + async def activate(self, connection: S2AsyncConnection) -> None: ... + + @abc.abstractmethod + async def deactivate(self, connection: S2AsyncConnection) -> None: ... diff --git a/src/s2python/connection/async_/medium/s2_medium.py b/src/s2python/connection/async_/medium/s2_medium.py new file mode 100644 index 0000000..ddf9d59 --- /dev/null +++ b/src/s2python/connection/async_/medium/s2_medium.py @@ -0,0 +1,62 @@ +import abc +import typing +from typing import AsyncGenerator, Awaitable, Callable + +from s2python.s2_parser import UnparsedS2Message + + +class S2MediumException(Exception): + ... + +class MediumClosedConnectionError(S2MediumException): + ... + +class MediumCouldNotConnectError(S2MediumException): + ... + + +class S2MediumConnection(abc.ABC): + @abc.abstractmethod + async def is_connected(self) -> bool: + ... + + @abc.abstractmethod + async def messages(self) -> AsyncGenerator[UnparsedS2Message, None]: + ... + + @abc.abstractmethod + async def send(self, message: str) -> None: + ... + + +# BuildS2ConnectionAsync = Callable[[S2MediumConnectionAsync], Awaitable["S2AsyncConnection"]] +# +# +# class S2MediumConnectorAsync(abc.ABC): +# """S2 medium specific factory for S2Connections.""" +# +# @abc.abstractmethod +# async def set_connection_builder(self, +# builder: BuildS2ConnectionAsync) -> None: +# ... +# +# @abc.abstractmethod +# async def run(self) -> None: +# """Start up the connection or start listening for new connections. +# +# This function may block or not depending in the implementation. +# E.g. it will block if a listening socket is opened, or it may return once a single client +# connection is established. +# """ +# ... +# +# @abc.abstractmethod +# async def close(self) -> None: +# """Close the medium connector. +# +# This does not close any functions created by the connector, only the connector itself. +# Also, this function may not be implemented in all cases. For instance, if the connector +# only creates a single client connection and then exits, there is no need to close anything +# so in those cases this function may be a no-op. +# """ +# ... diff --git a/src/s2python/connection/medium/websocket.py b/src/s2python/connection/async_/medium/websocket.py similarity index 89% rename from src/s2python/connection/medium/websocket.py rename to src/s2python/connection/async_/medium/websocket.py index 3d39c4c..2530cd8 100644 --- a/src/s2python/connection/medium/websocket.py +++ b/src/s2python/connection/async_/medium/websocket.py @@ -1,6 +1,7 @@ import logging import ssl -from typing import Generator, AsyncGenerator, Optional, Dict, Any +from typing import AsyncGenerator, Optional, Dict, Any +from typing_extensions import override from s2python.s2_parser import UnparsedS2Message @@ -15,12 +16,12 @@ "The 'websockets' package is required. Run 'pip install s2-python[ws]' to use this feature." ) from exc -from s2python.connection.medium.s2_medium import S2Medium, MediumClosedConnectionError, MediumCouldNotConnectError +from s2python.connection.async_.medium.s2_medium import MediumClosedConnectionError, MediumCouldNotConnectError, S2MediumConnection logger = logging.getLogger("s2python") -class WebsocketMedium(S2Medium): +class WebsocketClientMedium(S2MediumConnection): url: str _ws: Optional[WSConnection] @@ -57,9 +58,11 @@ async def connect(self) -> None: logger.error(message) raise MediumCouldNotConnectError(message) + @override async def is_connected(self) -> bool: return self.ws is not None and not self._closed + @override async def messages(self) -> AsyncGenerator[UnparsedS2Message, None]: try: async for message in self.ws: @@ -68,13 +71,10 @@ async def messages(self) -> AsyncGenerator[UnparsedS2Message, None]: self._closed = True raise MediumClosedConnectionError(f'Could not receive more messages on websocket connection {self.url}') from e + @override async def send(self, message: str) -> None: try: await self.ws.send(message) except websockets.WebSocketException as e: self._closed = True raise MediumClosedConnectionError(f'Could not send message {message}') from e - - async def close(self) -> None: - await self.ws.close() - await self.ws.wait_closed() \ No newline at end of file diff --git a/src/s2python/connection/async_/message_handlers.py b/src/s2python/connection/async_/message_handlers.py new file mode 100644 index 0000000..e27b995 --- /dev/null +++ b/src/s2python/connection/async_/message_handlers.py @@ -0,0 +1,105 @@ +from asyncio.events import AbstractEventLoop +from s2python.connection.async_.connection import S2AsyncConnection + +import asyncio +import logging +import uuid +from typing import Any, Coroutine, Literal, Optional, List, Type, Dict, Callable, Awaitable, Union + +from s2python.common import ReceptionStatusValues +from s2python.connection.types import S2ConnectionEvent, S2ConnectionEventsAndMessages +from s2python.message import S2Message, S2MessageWithID + + +logger = logging.getLogger("s2python") + +S2EventHandlerAsync = Callable[[S2AsyncConnection, S2ConnectionEvent, Optional[Coroutine[Any, Any, None]]], Coroutine[Any, Any, None]] + +class SendOkay: + _status_is_send: asyncio.Event + _connection: "S2AsyncConnection" + _subject_message_id: uuid.UUID + + def __init__(self, connection: "S2AsyncConnection", subject_message_id: uuid.UUID): + self._status_is_send = asyncio.Event() + self._connection = connection + self._subject_message_id = subject_message_id + + async def run(self) -> None: + """Send the ReceptionStatus OK asynchronously and register it.""" + self._status_is_send.set() + + await self._connection.respond_with_reception_status( # pylint: disable=protected-access + subject_message_id=self._subject_message_id, + status=ReceptionStatusValues.OK, + diagnostic_label="Processed okay.", + ) + + async def ensure_send(self, type_msg: Type[S2Message]) -> None: + """Ensure that the ReceptionStatus has been send. + + Send the ReceptionStatus OK if it hasn't been send yet. + + :param type_msg: The type of S2 message for which the ReceptionStatus should have been send. Logging purposes. + """ + if not self._status_is_send.is_set(): + logger.warning( + "Handler for message %s %s did not call send_okay / function to send the ReceptionStatus. " + "Sending it now.", + type_msg, + self._subject_message_id, + ) + await self.run() + + +class MessageHandlers: + handlers: Dict[Type[S2ConnectionEventsAndMessages], S2EventHandlerAsync] + + def __init__(self) -> None: + self.handlers = {} + + async def handle_event(self, connection: S2AsyncConnection, event: S2ConnectionEventsAndMessages) -> None: + """Handle the S2 message using the registered handler. + + :param connection: The S2 conncetion the `msg` is received from. + :param msg: The S2 message + """ + handler = self.handlers.get(type(event)) + if handler is not None: + send_okay = None + try: + if isinstance(event, S2MessageWithID): + send_okay = SendOkay(connection, event.message_id) + await handler(connection, event, send_okay.run()) + else: + await handler(connection, event, None) + except Exception: + if send_okay and not send_okay._status_is_send.is_set(): + await connection.respond_with_reception_status( + subject_message_id=event.message_id, + status=ReceptionStatusValues.PERMANENT_ERROR, + diagnostic_label=f"While processing message {event.message_id} " + f"an unrecoverable error occurred.", + ) + raise + if send_okay: + await send_okay.ensure_send(type(event)) + else: + logger.warning( + "Received an event of type %s but no handler is registered. Ignoring the event.", + type(event), + ) + + def register_handler( + self, event_type: Type[S2ConnectionEvent], handler: S2MessageHandlerAsync + ) -> None: + """Register a coroutine function or a normal function as the handler for a specific S2 message type. + + :param msg_type: The S2 message type to attach the handler to. + :param handler: The function (asynchronuous or normal) which should handle the S2 message. + """ + self.handlers[event_type] = handler + + def unregister_handler(self, s2_message_type: Type[S2ConnectionEvent]): + if s2_message_type in self.handlers: + del self.handlers[s2_message_type] diff --git a/src/s2python/connection/connection_events.py b/src/s2python/connection/connection_events.py new file mode 100644 index 0000000..9f65d40 --- /dev/null +++ b/src/s2python/connection/connection_events.py @@ -0,0 +1,12 @@ +import abc + + +class S2ConnectionEvent(abc.ABC): + pass + + +class ConnectionStarted(S2ConnectionEvent): + pass + +class ConnectionStopped(S2ConnectionEvent): + pass diff --git a/src/s2python/connection/control_type.py b/src/s2python/connection/control_type.py new file mode 100644 index 0000000..09f6252 --- /dev/null +++ b/src/s2python/connection/control_type.py @@ -0,0 +1,11 @@ +import abc + +from s2python.common import EnergyManagementRole +from s2python.connection.async_.connection import S2AsyncConnection + +class RoleHandler(abc.ABC): + @abc.abstractmethod + def get_s2_role(self) -> EnergyManagementRole: ... + + @abc.abstractmethod + def register_handlers(self, connection: S2AsyncConnection) -> None: ... diff --git a/src/s2python/connection/medium/s2_medium.py b/src/s2python/connection/medium/s2_medium.py deleted file mode 100644 index a24c0c0..0000000 --- a/src/s2python/connection/medium/s2_medium.py +++ /dev/null @@ -1,36 +0,0 @@ -import abc -from typing import AsyncGenerator - -from s2python.s2_parser import UnparsedS2Message - - -class S2MediumException(Exception): - ... - -class MediumClosedConnectionError(S2MediumException): - ... - -class MediumCouldNotConnectError(S2MediumException): - ... - - -class S2Medium(abc.ABC): - @abc.abstractmethod - async def connect(self) -> None: - ... - - @abc.abstractmethod - async def is_connected(self) -> bool: - ... - - @abc.abstractmethod - async def messages(self) -> AsyncGenerator[UnparsedS2Message, None]: - ... - - @abc.abstractmethod - async def send(self, message: str) -> None: - ... - - @abc.abstractmethod - async def close(self) -> None: - ... \ No newline at end of file diff --git a/src/s2python/connection/s2_async_connection.py b/src/s2python/connection/s2_async_connection.py deleted file mode 100644 index f8c9ea6..0000000 --- a/src/s2python/connection/s2_async_connection.py +++ /dev/null @@ -1,538 +0,0 @@ -from s2python.connection.medium.s2_medium import S2Medium, MediumClosedConnectionError - -import asyncio -import json -import logging -import time -import threading -import uuid -from dataclasses import dataclass -from typing import Any, Coroutine, Optional, List, Type, Dict, Callable, Awaitable, Union - - - -from s2python.common import ( - ReceptionStatusValues, - ReceptionStatus, - Handshake, - EnergyManagementRole, - Role, - HandshakeResponse, - ResourceManagerDetails, - Duration, - Currency, - SelectControlType, -) -from s2python.generated.gen_s2 import CommodityQuantity -from s2python.reception_status_awaiter import ReceptionStatusAwaiter -from s2python.s2_control_type import S2ControlType -from s2python.s2_parser import S2Parser -from s2python.s2_validation_error import S2ValidationError -from s2python.message import S2Message, S2MessageWithID -from s2python.version import S2_VERSION - -logger = logging.getLogger("s2python") - - - -class CouldNotReceiveStatusReceptionError(Exception): - ... - -@dataclass -class AssetDetails: # pylint: disable=too-many-instance-attributes - resource_id: uuid.UUID - - provides_forecast: bool - provides_power_measurements: List[CommodityQuantity] - - instruction_processing_delay: Duration - roles: List[Role] - currency: Optional[Currency] = None - - name: Optional[str] = None - manufacturer: Optional[str] = None - model: Optional[str] = None - firmware_version: Optional[str] = None - serial_number: Optional[str] = None - - def to_resource_manager_details( - self, control_types: List[S2ControlType] - ) -> ResourceManagerDetails: - return ResourceManagerDetails( - available_control_types=[ - control_type.get_protocol_control_type() - for control_type in control_types - ], - currency=self.currency, - firmware_version=self.firmware_version, - instruction_processing_delay=self.instruction_processing_delay, - manufacturer=self.manufacturer, - message_id=uuid.uuid4(), - model=self.model, - name=self.name, - provides_forecast=self.provides_forecast, - provides_power_measurement_types=self.provides_power_measurements, - resource_id=self.resource_id, - roles=self.roles, - serial_number=self.serial_number, - ) - - -S2MessageHandler = Union[ - Callable[["S2Connection", S2Message, Callable[[], None]], None], - Callable[["S2Connection", S2Message, Awaitable[None]], Awaitable[None]], -] - - -class SendOkay: - status_is_send: threading.Event - connection: "S2Connection" - subject_message_id: uuid.UUID - - def __init__(self, connection: "S2Connection", subject_message_id: uuid.UUID): - self.status_is_send = threading.Event() - self.connection = connection - self.subject_message_id = subject_message_id - - async def run_async(self) -> None: - self.status_is_send.set() - - await self.connection._respond_with_reception_status( # pylint: disable=protected-access - subject_message_id=self.subject_message_id, - status=ReceptionStatusValues.OK, - diagnostic_label="Processed okay.", - ) - - def run_sync(self) -> None: - self.status_is_send.set() - - self.connection.respond_with_reception_status_sync( - subject_message_id=self.subject_message_id, - status=ReceptionStatusValues.OK, - diagnostic_label="Processed okay.", - ) - - async def ensure_send_async(self, type_msg: Type[S2Message]) -> None: - if not self.status_is_send.is_set(): - logger.warning( - "Handler for message %s %s did not call send_okay / function to send the ReceptionStatus. " - "Sending it now.", - type_msg, - self.subject_message_id, - ) - await self.run_async() - - def ensure_send_sync(self, type_msg: Type[S2Message]) -> None: - if not self.status_is_send.is_set(): - logger.warning( - "Handler for message %s %s did not call send_okay / function to send the ReceptionStatus. " - "Sending it now.", - type_msg, - self.subject_message_id, - ) - self.run_sync() - - -class MessageHandlers: - handlers: Dict[Type[S2Message], S2MessageHandler] - - def __init__(self) -> None: - self.handlers = {} - - async def handle_message(self, connection: "S2Connection", msg: S2Message) -> None: - """Handle the S2 message using the registered handler. - - :param connection: The S2 conncetion the `msg` is received from. - :param msg: The S2 message - """ - handler = self.handlers.get(type(msg)) - if handler is not None: - send_okay = SendOkay(connection, msg.message_id) # type: ignore[attr-defined, union-attr] - - try: - if asyncio.iscoroutinefunction(handler): - await handler(connection, msg, send_okay.run_async()) # type: ignore[arg-type] - await send_okay.ensure_send_async(type(msg)) - else: - - def do_message() -> None: - handler(connection, msg, send_okay.run_sync) # type: ignore[arg-type] - send_okay.ensure_send_sync(type(msg)) - - eventloop = asyncio.get_event_loop() - await eventloop.run_in_executor(executor=None, func=do_message) - except Exception: - if not send_okay.status_is_send.is_set(): - await connection._respond_with_reception_status( # pylint: disable=protected-access - subject_message_id=msg.message_id, # type: ignore[attr-defined, union-attr] - status=ReceptionStatusValues.PERMANENT_ERROR, - diagnostic_label=f"While processing message {msg.message_id} " # type: ignore[attr-defined, union-attr] # pylint: disable=line-too-long - f"an unrecoverable error occurred.", - ) - raise - else: - logger.warning( - "Received a message of type %s but no handler is registered. Ignoring the message.", - type(msg), - ) - - def register_handler( - self, msg_type: Type[S2Message], handler: S2MessageHandler - ) -> None: - """Register a coroutine function or a normal function as the handler for a specific S2 message type. - - :param msg_type: The S2 message type to attach the handler to. - :param handler: The function (asynchronuous or normal) which should handle the S2 message. - """ - self.handlers[msg_type] = handler - - -class S2AsyncRM: - connection: 'S2AsyncConnection' - - def __init__(self): - self.connection._handlers.register_handler( - SelectControlType, self._handle_select_control_type - ) - self.connection._handlers.register_handler(Handshake, self._handle_handshake) - self.connection._handlers.register_handler(HandshakeResponse, self._handle_handshake_response) - - async def _connect_as_rm(self) -> None: - await self.connection.send_msg_and_await_reception_status( - Handshake( - message_id=uuid.uuid4(), - role=self.role, - supported_protocol_versions=[S2_VERSION], - ) - ) - logger.debug( - "Send handshake to CEM. Expecting Handshake and HandshakeResponse from CEM." - ) - - async def _handle_handshake( - self, connection: "S2AsyncConnection", message: S2Message, send_okay: Awaitable[None] - ) -> None: - if not isinstance(message, Handshake): - logger.error( - "Handler for Handshake received a message of the wrong type: %s", - type(message), - ) - return - - logger.debug( - "%s supports S2 protocol versions: %s", - message.role, - message.supported_protocol_versions, - ) - await send_okay - - async def _handle_handshake_response( - self, connection: "S2AsyncConnection", message: S2Message, send_okay: Awaitable[None] - ) -> None: - if not isinstance(message, HandshakeResponse): - logger.error( - "Handler for HandshakeResponse received a message of the wrong type: %s", - type(message), - ) - return - - logger.debug("Received HandshakeResponse %s", message.to_json()) - - logger.debug( - "CEM selected to use version %s", message.selected_protocol_version - ) - await send_okay - logger.debug("Handshake complete. Sending first ResourceManagerDetails.") - - await connection.send_msg_and_await_reception_status( - self.asset_details.to_resource_manager_details(self.control_types) - ) - - async def _handle_select_control_type( - self, connection: "S2AsyncConnection", message: S2Message, send_okay: Awaitable[None] - ) -> None: - if not isinstance(message, SelectControlType): - logger.error( - "Handler for SelectControlType received a message of the wrong type: %s", - type(message), - ) - return - - await send_okay - - logger.debug( - "CEM selected control type %s. Activating control type.", - message.control_type, - ) - - control_types_by_protocol_name = { - c.get_protocol_control_type(): c for c in self.control_types - } - selected_control_type: Optional[S2ControlType] = ( - control_types_by_protocol_name.get(message.control_type) - ) - - if self._current_control_type is not None: - await self._eventloop.run_in_executor( - None, self._current_control_type.deactivate, self - ) - - self._current_control_type = selected_control_type - - if self._current_control_type is not None: - await self._eventloop.run_in_executor( - None, self._current_control_type.activate, self - ) - self._current_control_type.register_handlers(self._handlers) - - -class S2AsyncConnection: # pylint: disable=too-many-instance-attributes - url: str - reconnect: bool - reception_status_awaiter: ReceptionStatusAwaiter - medium: S2Medium - s2_parser: S2Parser - control_types: List[S2ControlType] - role: EnergyManagementRole - asset_details: AssetDetails - - _handlers: MessageHandlers - _current_control_type: Optional[S2ControlType] - _received_messages: asyncio.Queue - - _eventloop: asyncio.AbstractEventLoop - _main_task: Optional[asyncio.Task] - _stop_event: asyncio.Event - """Stop the S2 connection permanently.""" - _restart_connection_event: asyncio.Event - """Stop the S2 connection but restart if configured.""" - _verify_certificate: bool - _bearer_token: Optional[str] - - def __init__( # pylint: disable=too-many-arguments - self, - url: str, - role: EnergyManagementRole, - control_types: List[S2ControlType], - asset_details: AssetDetails, - medium: S2Medium, - reconnect: bool = False, - verify_certificate: bool = True, - bearer_token: Optional[str] = None, - eventloop: Optional[asyncio.AbstractEventLoop] = None, - ) -> None: - self.url = url - self.reconnect = reconnect - self.reception_status_awaiter = ReceptionStatusAwaiter() - self.medium = medium - self.s2_parser = S2Parser() - - self._handlers = MessageHandlers() - self._current_control_type = None - - self._eventloop = eventloop if eventloop is not None else asyncio.get_event_loop() - - self.control_types = control_types - self.role = role - self.asset_details = asset_details - self._verify_certificate = verify_certificate - - self._main_task = None - self._bearer_token = bearer_token - - async def start_as_rm(self) -> None: - """Start this connection as a S2 resource manager and connect to a S2 CEM. - - This method will return until the connection is stopped. - """ - logger.debug('Starting S2 connection as RM.') - - self._main_task = self._eventloop.create_task(self._run_as(self._connect_as_rm())) - - async def stop(self) -> None: - """Stop the S2 connection gracefully and wait till it stops. - - Note: Not thread-safe. Must be run from the same event loop as `start_as_rm` runs in. - """ - logger.info("Will stop the S2 connection.") - self._stop_event.set() - if self._main_task is not None: - await self._main_task - - async def _run_as(self, role_task: Coroutine[None, None, None]) -> None: - logger.debug("Connecting as S2 resource manager.") - - self._stop_event = asyncio.Event() - - first_run = True - - while (first_run or self.reconnect) and not self._stop_event.is_set(): - if not first_run: - time.sleep(1) - first_run = False - self._restart_connection_event = asyncio.Event() - await self._connect_and_run(role_task) - - logger.debug("Finished S2 connection.") - - async def _wait_till_stop(self) -> None: - await self._stop_event.wait() - - async def _wait_till_connection_restart(self) -> None: - await self._restart_connection_event.wait() - - async def _connect_and_run(self, role_task: Coroutine[None, None, None]) -> None: - self._received_messages = asyncio.Queue() - await self.medium.connect() - if self.medium.is_connected(): - background_tasks = [ - self._eventloop.create_task(self._receive_messages()), - self._eventloop.create_task(self._wait_till_stop()), - self._eventloop.create_task(self._handle_received_messages()), - self._eventloop.create_task(self._wait_till_connection_restart()), - ] - - (done, pending) = await asyncio.wait( - background_tasks, return_when=asyncio.FIRST_COMPLETED - ) - if self._current_control_type: - self._current_control_type.deactivate(self) - self._current_control_type = None - - for task in done: - try: - await task - except asyncio.CancelledError: - pass - except MediumClosedConnectionError: - logger.info("The other party closed the websocket connection.") - except Exception: - logger.exception("An error occurred in the S2 connection. Terminating current connection.") - - for task in pending: - try: - task.cancel() - await task - except (asyncio.CancelledError, Exception): - pass - - await self.medium.close() - - async def _handle_received_messages(self) -> None: - while True: - msg = await self._received_messages.get() - await self._handlers.handle_message(self, msg) - - async def _receive_messages(self) -> None: - """Receives all incoming messages in the form of a generator. - - Will also receive the ReceptionStatus messages but instead of yielding these messages, they are routed - to any calls of `send_msg_and_await_reception_status`. - """ - if self.medium is None: - raise RuntimeError( - "Cannot receive messages if websocket connection is not yet established." - ) - - logger.info("S2 connection has started to receive messages.") - - async for message in await self.medium.messages(): - try: - s2_msg: S2Message = self.s2_parser.parse_as_any_message(message) - except json.JSONDecodeError: - await self.send_and_forget( - ReceptionStatus( - subject_message_id=uuid.UUID("00000000-0000-0000-0000-000000000000"), - status=ReceptionStatusValues.INVALID_DATA, - diagnostic_label="Not valid json.", - ) - ) - except S2ValidationError as e: - json_msg = json.loads(message) - message_id = json_msg.get("message_id") - if message_id: - await self.respond_with_reception_status( - subject_message_id=message_id, - status=ReceptionStatusValues.INVALID_MESSAGE, - diagnostic_label=str(e), - ) - else: - await self.respond_with_reception_status( - subject_message_id=uuid.UUID("00000000-0000-0000-0000-000000000000"), - status=ReceptionStatusValues.INVALID_DATA, - diagnostic_label="Message appears valid json but could not find a message_id field.", - ) - else: - logger.debug("Received message %s", s2_msg.to_json()) - - if isinstance(s2_msg, ReceptionStatus): - logger.debug( - "Message is a reception status for %s so registering in cache.", - s2_msg.subject_message_id, - ) - await self.reception_status_awaiter.receive_reception_status(s2_msg) - else: - await self._received_messages.put(s2_msg) - - async def send_and_forget(self, s2_msg: S2Message) -> None: - if self.medium is None: - raise RuntimeError( - "Cannot send messages if the S2 medium is not yet established." - ) - - json_msg = s2_msg.to_json() - logger.debug("Sending message %s", json_msg) - try: - await self.medium.send(json_msg) - except MediumClosedConnectionError as e: - logger.error("Unable to send message %s due to %s", s2_msg, str(e)) - self._restart_connection_event.set() - - async def respond_with_reception_status( - self, subject_message_id: uuid.UUID, status: ReceptionStatusValues, diagnostic_label: str - ) -> None: - logger.debug( - "Responding to message %s with status %s", subject_message_id, status - ) - await self.send_and_forget( - ReceptionStatus( - subject_message_id=subject_message_id, - status=status, - diagnostic_label=diagnostic_label, - ) - ) - - async def send_msg_and_await_reception_status( - self, - s2_msg: S2MessageWithID, - timeout_reception_status: float = 5.0, - raise_on_error: bool = True, - ) -> ReceptionStatus: - await self.send_and_forget(s2_msg) - logger.debug( - "Waiting for ReceptionStatus for %s %s seconds", - s2_msg.message_id, - timeout_reception_status, - ) - try: - reception_status_task = self._eventloop.create_task(self.reception_status_awaiter.wait_for_reception_status( - s2_msg.message_id, timeout_reception_status - )) - restart_event_task = self._eventloop.create_task(self._restart_connection_event.wait()) - - (done, pending) = await asyncio.wait([reception_status_task, restart_event_task], return_when=asyncio.FIRST_COMPLETED) - - if reception_status_task in done: - reception_status = await reception_status_task - else: - raise CouldNotReceiveStatusReceptionError(f"Connection restarted while waiting for ReceptionStatus for message {s2_msg.message_id}") - #TODO Still need to cancel pending tasks? - except TimeoutError: - logger.error("Did not receive a reception status on time for %s",s2_msg.message_id) - self._restart_connection_event.set() - raise - - if reception_status.status != ReceptionStatusValues.OK and raise_on_error: - raise RuntimeError(f"ReceptionStatus was not OK but rather {reception_status.status}") - - return reception_status diff --git a/src/s2python/connection/s2_sync_connection.py b/src/s2python/connection/s2_sync_connection.py deleted file mode 100644 index 38579c0..0000000 --- a/src/s2python/connection/s2_sync_connection.py +++ /dev/null @@ -1,397 +0,0 @@ -from s2python.common import ReceptionStatus -from s2python.message import S2MessageWithID - - -class S2Connection: # pylint: disable=too-many-instance-attributes - url: str - reconnect: bool - reception_status_awaiter: ReceptionStatusAwaiter - ws: Optional[WSConnection] - s2_parser: S2Parser - control_types: List[S2ControlType] - role: EnergyManagementRole - asset_details: AssetDetails - - _thread: threading.Thread - - _handlers: MessageHandlers - _current_control_type: Optional[S2ControlType] - _received_messages: asyncio.Queue - - _eventloop: asyncio.AbstractEventLoop - _stop_event: asyncio.Event - _restart_connection_event: asyncio.Event - _verify_certificate: bool - _bearer_token: Optional[str] - - def __init__( # pylint: disable=too-many-arguments - self, - url: str, - role: EnergyManagementRole, - control_types: List[S2ControlType], - asset_details: AssetDetails, - reconnect: bool = False, - verify_certificate: bool = True, - bearer_token: Optional[str] = None, - ) -> None: - self.url = url - self.reconnect = reconnect - self.reception_status_awaiter = ReceptionStatusAwaiter() - self.ws = None - self.s2_parser = S2Parser() - - self._handlers = MessageHandlers() - self._current_control_type = None - - self._eventloop = asyncio.new_event_loop() - - self.control_types = control_types - self.role = role - self.asset_details = asset_details - self._verify_certificate = verify_certificate - - self._handlers.register_handler( - SelectControlType, self._handle_select_control_type_as_rm - ) - self._handlers.register_handler(Handshake, self._handle_handshake) - self._handlers.register_handler(HandshakeResponse, self._handle_handshake_response_as_rm) - self._bearer_token = bearer_token - - def start_as_rm(self) -> None: - self._run_eventloop(self._run_as_rm()) - - def _run_eventloop(self, main_task: Awaitable[None]) -> None: - self._thread = threading.current_thread() - logger.debug("Starting eventloop") - try: - self._eventloop.run_until_complete(main_task) - except asyncio.CancelledError: - pass - logger.debug("S2 connection thread has stopped.") - - def stop(self) -> None: - """Stops the S2 connection. - - Note: Ensure this method is called from a different thread than the thread running the S2 connection. - Otherwise it will block waiting on the coroutine _do_stop to terminate successfully but it can't run - the coroutine. A `RuntimeError` will be raised to prevent the indefinite block. - """ - if threading.current_thread() == self._thread: - raise RuntimeError( - "Do not call stop from the thread running the S2 connection. This results in an infinite block!" - ) - if self._eventloop.is_running(): - asyncio.run_coroutine_threadsafe(self._do_stop(), self._eventloop).result() - self._thread.join() - logger.info("Stopped the S2 connection.") - - async def _do_stop(self) -> None: - logger.info("Will stop the S2 connection.") - self._stop_event.set() - - async def _run_as_rm(self) -> None: - logger.debug("Connecting as S2 resource manager.") - - self._stop_event = asyncio.Event() - - first_run = True - - while (first_run or self.reconnect) and not self._stop_event.is_set(): - first_run = False - self._restart_connection_event = asyncio.Event() - await self._connect_and_run() - time.sleep(1) - - logger.debug("Finished S2 connection eventloop.") - - async def _connect_and_run(self) -> None: - self._received_messages = asyncio.Queue() - await self._connect_ws() - if self.ws: - - async def wait_till_stop() -> None: - await self._stop_event.wait() - - async def wait_till_connection_restart() -> None: - await self._restart_connection_event.wait() - - background_tasks = [ - self._eventloop.create_task(self._receive_messages()), - self._eventloop.create_task(wait_till_stop()), - self._eventloop.create_task(self._connect_as_rm()), - self._eventloop.create_task(wait_till_connection_restart()), - ] - - (done, pending) = await asyncio.wait( - background_tasks, return_when=asyncio.FIRST_COMPLETED - ) - if self._current_control_type: - self._current_control_type.deactivate(self) - self._current_control_type = None - - for task in done: - try: - await task - except asyncio.CancelledError: - pass - except ( - websockets.ConnectionClosedError, - websockets.ConnectionClosedOK, - ): - logger.info("The other party closed the websocket connection.") - - for task in pending: - try: - task.cancel() - await task - except asyncio.CancelledError: - pass - - await self.ws.close() - await self.ws.wait_closed() - - async def _connect_ws(self) -> None: - try: - # set up connection arguments for SSL and bearer token, if required - connection_kwargs: Dict[str, Any] = {} - if self.url.startswith("wss://") and not self._verify_certificate: - connection_kwargs["ssl"] = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) - connection_kwargs["ssl"].check_hostname = False - connection_kwargs["ssl"].verify_mode = ssl.CERT_NONE - - if self._bearer_token: - connection_kwargs["additional_headers"] = { - "Authorization": f"Bearer {self._bearer_token}" - } - - self.ws = await ws_connect(uri=self.url, **connection_kwargs) - except (EOFError, OSError) as e: - logger.info("Could not connect due to: %s", str(e)) - - async def _connect_as_rm(self) -> None: - await self._send_msg_and_await_reception_status_async( - Handshake( - message_id=uuid.uuid4(), - role=self.role, - supported_protocol_versions=[S2_VERSION], - ) - ) - logger.debug( - "Send handshake to CEM. Expecting Handshake and HandshakeResponse from CEM." - ) - - await self._handle_received_messages() - - async def _handle_handshake( - self, _: "S2Connection", message: S2Message, send_okay: Awaitable[None] - ) -> None: - if not isinstance(message, Handshake): - logger.error( - "Handler for Handshake received a message of the wrong type: %s", - type(message), - ) - return - - logger.debug( - "%s supports S2 protocol versions: %s", - message.role, - message.supported_protocol_versions, - ) - await send_okay - - async def _handle_handshake_response_as_rm( - self, _: "S2Connection", message: S2Message, send_okay: Awaitable[None] - ) -> None: - if not isinstance(message, HandshakeResponse): - logger.error( - "Handler for HandshakeResponse received a message of the wrong type: %s", - type(message), - ) - return - - logger.debug("Received HandshakeResponse %s", message.to_json()) - - logger.debug( - "CEM selected to use version %s", message.selected_protocol_version - ) - await send_okay - logger.debug("Handshake complete. Sending first ResourceManagerDetails.") - - await self._send_msg_and_await_reception_status_async( - self.asset_details.to_resource_manager_details(self.control_types) - ) - - async def _handle_select_control_type_as_rm( - self, _: "S2Connection", message: S2Message, send_okay: Awaitable[None] - ) -> None: - if not isinstance(message, SelectControlType): - logger.error( - "Handler for SelectControlType received a message of the wrong type: %s", - type(message), - ) - return - - await send_okay - - logger.debug( - "CEM selected control type %s. Activating control type.", - message.control_type, - ) - - control_types_by_protocol_name = { - c.get_protocol_control_type(): c for c in self.control_types - } - selected_control_type: Optional[S2ControlType] = ( - control_types_by_protocol_name.get(message.control_type) - ) - - if self._current_control_type is not None: - await self._eventloop.run_in_executor( - None, self._current_control_type.deactivate, self - ) - - self._current_control_type = selected_control_type - - if self._current_control_type is not None: - await self._eventloop.run_in_executor( - None, self._current_control_type.activate, self - ) - self._current_control_type.register_handlers(self._handlers) - - async def _receive_messages(self) -> None: - """Receives all incoming messages in the form of a generator. - - Will also receive the ReceptionStatus messages but instead of yielding these messages, they are routed - to any calls of `send_msg_and_await_reception_status`. - """ - if self.ws is None: - raise RuntimeError( - "Cannot receive messages if websocket connection is not yet established." - ) - - logger.info("S2 connection has started to receive messages.") - - async for message in self.ws: - try: - s2_msg: S2Message = self.s2_parser.parse_as_any_message(message) - except json.JSONDecodeError: - await self._send_and_forget( - ReceptionStatus( - subject_message_id=uuid.UUID("00000000-0000-0000-0000-000000000000"), - status=ReceptionStatusValues.INVALID_DATA, - diagnostic_label="Not valid json.", - ) - ) - except S2ValidationError as e: - json_msg = json.loads(message) - message_id = json_msg.get("message_id") - if message_id: - await self._respond_with_reception_status( - subject_message_id=message_id, - status=ReceptionStatusValues.INVALID_MESSAGE, - diagnostic_label=str(e), - ) - else: - await self._respond_with_reception_status( - subject_message_id=uuid.UUID("00000000-0000-0000-0000-000000000000"), - status=ReceptionStatusValues.INVALID_DATA, - diagnostic_label="Message appears valid json but could not find a message_id field.", - ) - else: - logger.debug("Received message %s", s2_msg.to_json()) - - if isinstance(s2_msg, ReceptionStatus): - logger.debug( - "Message is a reception status for %s so registering in cache.", - s2_msg.subject_message_id, - ) - await self.reception_status_awaiter.receive_reception_status(s2_msg) - else: - await self._received_messages.put(s2_msg) - - async def _send_and_forget(self, s2_msg: S2Message) -> None: - if self.ws is None: - raise RuntimeError( - "Cannot send messages if websocket connection is not yet established." - ) - - json_msg = s2_msg.to_json() - logger.debug("Sending message %s", json_msg) - try: - await self.ws.send(json_msg) - except websockets.ConnectionClosedError as e: - logger.error("Unable to send message %s due to %s", s2_msg, str(e)) - self._restart_connection_event.set() - - async def _respond_with_reception_status( - self, subject_message_id: uuid.UUID, status: ReceptionStatusValues, diagnostic_label: str - ) -> None: - logger.debug( - "Responding to message %s with status %s", subject_message_id, status - ) - await self._send_and_forget( - ReceptionStatus( - subject_message_id=subject_message_id, - status=status, - diagnostic_label=diagnostic_label, - ) - ) - - def respond_with_reception_status_sync( - self, subject_message_id: uuid.UUID, status: ReceptionStatusValues, diagnostic_label: str - ) -> None: - asyncio.run_coroutine_threadsafe( - self._respond_with_reception_status( - subject_message_id, status, diagnostic_label - ), - self._eventloop, - ).result() - - async def _send_msg_and_await_reception_status_async( - self, - s2_msg: S2MessageWithID, - timeout_reception_status: float = 5.0, - raise_on_error: bool = True, - ) -> ReceptionStatus: - await self._send_and_forget(s2_msg) - logger.debug( - "Waiting for ReceptionStatus for %s %s seconds", - s2_msg.message_id, # type: ignore[attr-defined, union-attr] - timeout_reception_status, - ) - try: - reception_status = await self.reception_status_awaiter.wait_for_reception_status( - s2_msg.message_id, timeout_reception_status # type: ignore[attr-defined, union-attr] - ) - except TimeoutError: - logger.error( - "Did not receive a reception status on time for %s", - s2_msg.message_id, # type: ignore[attr-defined, union-attr] - ) - self._stop_event.set() - raise - - if reception_status.status != ReceptionStatusValues.OK and raise_on_error: - raise RuntimeError( - f"ReceptionStatus was not OK but rather {reception_status.status}" - ) - - return reception_status - - def send_msg_and_await_reception_status_sync( - self, - s2_msg: S2MessageWithID, - timeout_reception_status: float = 5.0, - raise_on_error: bool = True, - ) -> ReceptionStatus: - return asyncio.run_coroutine_threadsafe( - self._send_msg_and_await_reception_status_async( - s2_msg, timeout_reception_status, raise_on_error - ), - self._eventloop, - ).result() - - async def _handle_received_messages(self) -> None: - while True: - msg = await self._received_messages.get() - await self._handlers.handle_message(self, msg) \ No newline at end of file diff --git a/src/s2python/connection/sync/connection.py b/src/s2python/connection/sync/connection.py new file mode 100644 index 0000000..a762ef8 --- /dev/null +++ b/src/s2python/connection/sync/connection.py @@ -0,0 +1,127 @@ +import asyncio +import logging +import threading +import uuid +from typing import Any, Coroutine, Optional, List, Type, Callable + +from s2python.common import ( + ReceptionStatusValues, + EnergyManagementRole, +) +from s2python.connection.asset_details import AssetDetails +from s2python.connection.types import S2MessageHandlerSync, S2ConnectionEvent +from s2python.s2_control_type import S2ControlType +from s2python.message import S2Message + +from s2python.common import ReceptionStatus +from s2python.connection.medium.s2_medium import S2MediumConnectionAsync +from s2python.connection.s2_async_connection import S2AsyncConnection +from s2python.message import S2MessageWithID + +logger = logging.getLogger("s2python") + +S2EventHandlerSync = Callable[["S2SyncConnection", S2ConnectionEvent, Optional[Callable[[], None]]], None] + + + +class S2SyncConnection: + _thread: threading.Thread + _eventloop: asyncio.AbstractEventLoop + _async_s2_connection: S2AsyncConnection + + def __init__( # pylint: disable=too-many-arguments + self, + role: EnergyManagementRole, + control_types: List[S2ControlType], + asset_details: AssetDetails, + medium: S2MediumConnectionAsync, + eventloop: Optional[asyncio.AbstractEventLoop] = None, + ) -> None: + self._thread = threading.Thread(target=self._run_eventloop) + self._eventloop = asyncio.new_event_loop() if eventloop is None else eventloop + self._async_s2_connection = S2AsyncConnection(role, control_types, asset_details, medium, self._eventloop) + + def start_as_rm(self) -> None: + self._thread.start() + asyncio.run_coroutine_threadsafe( + self._async_s2_connection.start_as_rm(), + self._eventloop, + ).result() + + def _run_eventloop(self) -> None: + logger.debug("Starting synchronous S2 connection event loop in thread %s", self._thread.name) + self._eventloop.run_forever() + logger.debug("Synchronous S2 connection event loop in thread %s has stopped", self._thread.name) + + def stop(self) -> None: + """Stops the S2 connection. + + Note: Ensure this method is called from a different thread than the thread running the S2 connection. + Otherwise it will block waiting on the coroutine _do_stop to terminate successfully but it can't run + the coroutine. A `RuntimeError` will be raised to prevent the indefinite block. + """ + if threading.current_thread() == self._thread: + raise RuntimeError( + "Do not call stop from the thread running the S2 connection. This results in an infinite block!" + ) + if self._eventloop.is_running(): + asyncio.run_coroutine_threadsafe(self._async_s2_connection.stop(), self._eventloop).result() + self._eventloop.stop() + self._thread.join() + logger.info("Stopped the S2 connection.") + + def register_handler(self, s2_message_type: Type[S2MessageWithID], handler: S2MessageHandlerSync) -> None: + """Register a handler for a specific S2 message type. + + :param s2_message_type: The S2 message type to register the handler for. + :param handler: The handler function (asynchronous or normal) which will handle the message. + """ + + async def handle_s2_message_async_wrapper( + _: S2AsyncConnection, + s2_msg: S2ConnectionEvent, + send_okay: Coroutine[Any, Any, None], + ) -> None: + await self._eventloop.run_in_executor( + None, + handler, + self, + s2_msg, + lambda: asyncio.run_coroutine_threadsafe(send_okay, self._eventloop).result(), + ) + + self._async_s2_connection.register_handler(s2_message_type, handle_s2_message_async_wrapper) + + def unregister_handler(self, s2_message_type: Type[S2MessageWithID]) -> None: + self._async_s2_connection.unregister_handler(s2_message_type) + + def send_and_forget( + self, s2_msg: S2Message + ) -> None: + asyncio.run_coroutine_threadsafe( + self._async_s2_connection.send_and_forget(s2_msg), + self._eventloop, + ).result() + + def respond_with_reception_status( + self, subject_message_id: uuid.UUID, status: ReceptionStatusValues, diagnostic_label: str + ) -> None: + asyncio.run_coroutine_threadsafe( + self._async_s2_connection.respond_with_reception_status( + subject_message_id, status, diagnostic_label + ), + self._eventloop, + ).result() + + def send_msg_and_await_reception_status( + self, + s2_msg: S2MessageWithID, + timeout_reception_status: float = 5.0, + raise_on_error: bool = True, + ) -> ReceptionStatus: + return asyncio.run_coroutine_threadsafe( + self._async_s2_connection.send_msg_and_await_reception_status( + s2_msg, timeout_reception_status, raise_on_error + ), + self._eventloop, + ).result() diff --git a/src/s2python/connection/sync/control_type/control_types.py b/src/s2python/connection/sync/control_type/control_types.py new file mode 100644 index 0000000..e69de29 diff --git a/src/s2python/connection/types.py b/src/s2python/connection/types.py new file mode 100644 index 0000000..abbf59e --- /dev/null +++ b/src/s2python/connection/types.py @@ -0,0 +1,8 @@ +from s2python.connection.connection_events import S2ConnectionEvent + +from typing import Callable, Union, Coroutine, Any, Optional + +from s2python.message import S2MessageWithID + + +S2ConnectionEventsAndMessages = Union[S2MessageWithID, S2ConnectionEvent] diff --git a/src/s2python/s2_connection.py b/src/s2python/s2_connection.py index b5d1ab0..896368e 100644 --- a/src/s2python/s2_connection.py +++ b/src/s2python/s2_connection.py @@ -32,7 +32,6 @@ Currency, SelectControlType, ) -from s2python.generated.gen_s2 import CommodityQuantity from s2python.reception_status_awaiter import ReceptionStatusAwaiter from s2python.s2_control_type import S2ControlType from s2python.s2_parser import S2Parser @@ -43,44 +42,7 @@ logger = logging.getLogger("s2python") -@dataclass -class AssetDetails: # pylint: disable=too-many-instance-attributes - resource_id: uuid.UUID - - provides_forecast: bool - provides_power_measurements: List[CommodityQuantity] - - instruction_processing_delay: Duration - roles: List[Role] - currency: Optional[Currency] = None - - name: Optional[str] = None - manufacturer: Optional[str] = None - model: Optional[str] = None - firmware_version: Optional[str] = None - serial_number: Optional[str] = None - - def to_resource_manager_details( - self, control_types: List[S2ControlType] - ) -> ResourceManagerDetails: - return ResourceManagerDetails( - available_control_types=[ - control_type.get_protocol_control_type() - for control_type in control_types - ], - currency=self.currency, - firmware_version=self.firmware_version, - instruction_processing_delay=self.instruction_processing_delay, - manufacturer=self.manufacturer, - message_id=uuid.uuid4(), - model=self.model, - name=self.name, - provides_forecast=self.provides_forecast, - provides_power_measurement_types=self.provides_power_measurements, - resource_id=self.resource_id, - roles=self.roles, - serial_number=self.serial_number, - ) + S2MessageHandler = Union[ diff --git a/src/s2python/s2_control_type.py b/src/s2python/s2_control_type.py index 135f775..02e51bc 100644 --- a/src/s2python/s2_control_type.py +++ b/src/s2python/s2_control_type.py @@ -11,106 +11,3 @@ from s2python.s2_connection import S2Connection, MessageHandlers -class S2ControlType(abc.ABC): - @abc.abstractmethod - def get_protocol_control_type(self) -> ProtocolControlType: ... - - @abc.abstractmethod - def register_handlers(self, handlers: "MessageHandlers") -> None: ... - - @abc.abstractmethod - def activate(self, conn: "S2Connection") -> None: ... - - @abc.abstractmethod - def deactivate(self, conn: "S2Connection") -> None: ... - - -class FRBCControlType(S2ControlType): - def get_protocol_control_type(self) -> ProtocolControlType: - return ProtocolControlType.FILL_RATE_BASED_CONTROL - - def register_handlers(self, handlers: "MessageHandlers") -> None: - handlers.register_handler(FRBCInstruction, self.handle_instruction) - - @abc.abstractmethod - def handle_instruction( - self, conn: "S2Connection", msg: S2Message, send_okay: typing.Callable[[], None] - ) -> None: ... - - @abc.abstractmethod - def activate(self, conn: "S2Connection") -> None: - """Overwrite with the actual dctivation logic of your Resource Manager for this particular control type.""" - - @abc.abstractmethod - def deactivate(self, conn: "S2Connection") -> None: - """Overwrite with the actual deactivation logic of your Resource Manager for this particular control type.""" - - -class PPBCControlType(S2ControlType): - def get_protocol_control_type(self) -> ProtocolControlType: - return ProtocolControlType.POWER_PROFILE_BASED_CONTROL - - def register_handlers(self, handlers: "MessageHandlers") -> None: - handlers.register_handler(PPBCScheduleInstruction, self.handle_instruction) - - @abc.abstractmethod - def handle_instruction( - self, conn: "S2Connection", msg: S2Message, send_okay: typing.Callable[[], None] - ) -> None: ... - - @abc.abstractmethod - def activate(self, conn: "S2Connection") -> None: - """Overwrite with the actual dctivation logic of your Resource Manager for this particular control type.""" - - @abc.abstractmethod - def deactivate(self, conn: "S2Connection") -> None: - """Overwrite with the actual deactivation logic of your Resource Manager for this particular control type.""" - - -class OMBCControlType(S2ControlType): - def get_protocol_control_type(self) -> ProtocolControlType: - return ProtocolControlType.OPERATION_MODE_BASED_CONTROL - - def register_handlers(self, handlers: "MessageHandlers") -> None: - handlers.register_handler(OMBCInstruction, self.handle_instruction) - - @abc.abstractmethod - def handle_instruction( - self, conn: "S2Connection", msg: S2Message, send_okay: typing.Callable[[], None] - ) -> None: ... - - @abc.abstractmethod - def activate(self, conn: "S2Connection") -> None: - """Overwrite with the actual dctivation logic of your Resource Manager for this particular control type.""" - - @abc.abstractmethod - def deactivate(self, conn: "S2Connection") -> None: - """Overwrite with the actual deactivation logic of your Resource Manager for this particular control type.""" - - -class PEBCControlType(S2ControlType): - def get_protocol_control_type(self) -> ProtocolControlType: - return ProtocolControlType.POWER_ENVELOPE_BASED_CONTROL - - def register_handlers(self, handlers: "MessageHandlers") -> None: - pass - - @abc.abstractmethod - def activate(self, conn: "S2Connection") -> None: ... - - @abc.abstractmethod - def deactivate(self, conn: "S2Connection") -> None: ... - - -class NoControlControlType(S2ControlType): - def get_protocol_control_type(self) -> ProtocolControlType: - return ProtocolControlType.NOT_CONTROLABLE - - def register_handlers(self, handlers: "MessageHandlers") -> None: - pass - - @abc.abstractmethod - def activate(self, conn: "S2Connection") -> None: ... - - @abc.abstractmethod - def deactivate(self, conn: "S2Connection") -> None: ... From 69ebb060b90e9429ea193d7ec4aa002bc0da3147 Mon Sep 17 00:00:00 2001 From: Sebastiaan la Fleur Date: Fri, 31 Oct 2025 16:11:26 +0100 Subject: [PATCH 3/3] 139: Get async mostly to work, started on getting sync to work. --- examples/async_frbc_rm.py | 32 ++- examples/sync_frbc_rm.py | 208 +++++++++++++++ src/s2python/connection/__init__.py | 1 + src/s2python/connection/asset_details.py | 14 +- src/s2python/connection/async_/connection.py | 18 +- .../async_/control_type/class_based.py | 11 +- .../medium/__init__.py} | 0 .../connection/async_/message_handlers.py | 20 +- src/s2python/connection/sync/__init__.py | 2 + src/s2python/connection/sync/connection.py | 29 +-- .../sync/control_type/class_based.py | 241 ++++++++++++++++++ 11 files changed, 522 insertions(+), 54 deletions(-) create mode 100644 examples/sync_frbc_rm.py rename src/s2python/connection/{sync/control_type/control_types.py => async_/medium/__init__.py} (100%) create mode 100644 src/s2python/connection/sync/__init__.py create mode 100644 src/s2python/connection/sync/control_type/class_based.py diff --git a/examples/async_frbc_rm.py b/examples/async_frbc_rm.py index 5ddf984..2046e02 100644 --- a/examples/async_frbc_rm.py +++ b/examples/async_frbc_rm.py @@ -6,8 +6,9 @@ import uuid import signal import datetime -from typing import Callable +from typing import Callable, Optional, Coroutine, Any +from s2python.connection.types import S2ConnectionEventsAndMessages from s2python.common import ( Duration, Role, @@ -34,7 +35,6 @@ from s2python.connection.async_ import S2AsyncConnection from s2python.connection.async_.medium.websocket import WebsocketClientMedium from s2python.connection.async_.control_type.class_based import FRBCControlType, NoControlControlType, ResourceManagerHandler -from s2python.message import S2Message logger = logging.getLogger("s2python") logger.addHandler(logging.StreamHandler(sys.stdout)) @@ -43,7 +43,7 @@ class MyFRBCControlType(FRBCControlType): async def handle_instruction( - self, connection: S2AsyncConnection, msg: S2Message, send_okay: Callable[[], None] + self, connection: S2AsyncConnection, msg: S2ConnectionEventsAndMessages, send_okay: Optional[Coroutine[Any, Any, None]] ) -> None: if not isinstance(msg, FRBCInstruction): raise RuntimeError( @@ -154,7 +154,7 @@ def stop(s2_connection, signal_num, _current_stack_frame): s2_connection.stop() -async def start_s2_session(url, client_node_id=uuid.uuid4()): +async def start_s2_session(url, client_node_id: uuid.UUID): # Configure a resource manager rm_handler = ResourceManagerHandler( asset_details=AssetDetails( @@ -178,18 +178,30 @@ async def start_s2_session(url, client_node_id=uuid.uuid4()): rm_handler.register_handlers(s2_conn) await s2_conn.start() - signal.signal(signal.SIGINT, partial(stop, s2_conn)) - signal.signal(signal.SIGTERM, partial(stop, s2_conn)) + stop_event = asyncio.Event() + eventloop = asyncio.get_running_loop() + + async def stop(): + print(f"Received signal. Will stop S2 connection.") + stop_event.set() + + eventloop.add_signal_handler(signal.SIGINT, lambda: eventloop.create_task(stop())) + eventloop.add_signal_handler(signal.SIGTERM, lambda: eventloop.create_task(stop())) + await stop_event.wait() + + await s2_conn.stop() if __name__ == "__main__": parser = argparse.ArgumentParser(description="A simple S2 reseource manager example.") + client_node_id = uuid.uuid4() parser.add_argument( - "endpoint", + "--endpoint", type=str, - help="WebSocket endpoint uri for the server (CEM) e.g. " - "ws://localhost:8080/backend/rm/s2python-frbc/cem/dummy_model/ws", + required=False, + help=f"WebSocket endpoint uri for the server (CEM) e.g. ws://localhost:8000/ws/{client_node_id}", + default=f"ws://localhost:8000/ws/{client_node_id}", ) args = parser.parse_args() - asyncio.run(start_s2_session(args.endpoint)) + asyncio.run(start_s2_session(args.endpoint, client_node_id)) diff --git a/examples/sync_frbc_rm.py b/examples/sync_frbc_rm.py new file mode 100644 index 0000000..136fdd7 --- /dev/null +++ b/examples/sync_frbc_rm.py @@ -0,0 +1,208 @@ +import argparse +import asyncio +import threading +import logging +import sys +import uuid +import signal +import datetime +from typing import Callable, Optional + +from s2python.common import ( + Duration, + Role, + RoleType, + Commodity, + Currency, + NumberRange, + PowerRange, + CommodityQuantity, +) +from s2python.connection.types import S2ConnectionEventsAndMessages +from s2python.frbc import ( + FRBCInstruction, + FRBCSystemDescription, + FRBCActuatorDescription, + FRBCStorageDescription, + FRBCOperationMode, + FRBCOperationModeElement, + FRBCFillLevelTargetProfile, + FRBCFillLevelTargetProfileElement, + FRBCStorageStatus, + FRBCActuatorStatus, +) +from s2python.connection import AssetDetails +from s2python.connection.sync import S2SyncConnection +from s2python.connection.async_.medium.websocket import WebsocketClientMedium +from s2python.connection.sync.control_type.class_based import FRBCControlType, NoControlControlType, ResourceManagerHandler + +logger = logging.getLogger("s2python") +logger.addHandler(logging.StreamHandler(sys.stdout)) +logger.setLevel(logging.DEBUG) + + +class MyFRBCControlType(FRBCControlType): + def handle_instruction( + self, connection: S2SyncConnection, msg: S2ConnectionEventsAndMessages, send_okay: Optional[Callable[[], None]] + ) -> None: + if not isinstance(msg, FRBCInstruction): + raise RuntimeError( + f"Expected an FRBCInstruction but received a message of type {type(msg)}." + ) + print(f"I have received the message {msg} from {connection}") + + def activate(self, connection: S2SyncConnection) -> None: + print("The control type FRBC is now activated.") + + print("Time to send a FRBC SystemDescription") + actuator_id = uuid.uuid4() + operation_mode_id = uuid.uuid4() + connection.send_msg_and_await_reception_status( + FRBCSystemDescription( + message_id=uuid.uuid4(), + valid_from=datetime.datetime.now(tz=datetime.timezone.utc), + actuators=[ + FRBCActuatorDescription( + id=actuator_id, + operation_modes=[ + FRBCOperationMode( + id=operation_mode_id, + elements=[ + FRBCOperationModeElement( + fill_level_range=NumberRange( + start_of_range=0.0, end_of_range=100.0 + ), + fill_rate=NumberRange( + start_of_range=-5.0, end_of_range=5.0 + ), + power_ranges=[ + PowerRange( + start_of_range=-200.0, + end_of_range=200.0, + commodity_quantity=CommodityQuantity.ELECTRIC_POWER_L1, + ) + ], + ) + ], + diagnostic_label="Load & unload battery", + abnormal_condition_only=False, + ) + ], + transitions=[], + timers=[], + supported_commodities=[Commodity.ELECTRICITY], + ) + ], + storage=FRBCStorageDescription( + fill_level_range=NumberRange(start_of_range=0.0, end_of_range=100.0), + fill_level_label="%", + diagnostic_label="Imaginary battery", + provides_fill_level_target_profile=True, + provides_leakage_behaviour=False, + provides_usage_forecast=False, + ), + ) + ) + print("Also send the target profile") + + connection.send_msg_and_await_reception_status( + FRBCFillLevelTargetProfile( + message_id=uuid.uuid4(), + start_time=datetime.datetime.now(tz=datetime.timezone.utc), + elements=[ + FRBCFillLevelTargetProfileElement( + duration=Duration.from_milliseconds(30_000), + fill_level_range=NumberRange(start_of_range=20.0, end_of_range=30.0), + ), + FRBCFillLevelTargetProfileElement( + duration=Duration.from_milliseconds(300_000), + fill_level_range=NumberRange(start_of_range=40.0, end_of_range=50.0), + ), + ], + ) + ) + + print("Also send the storage status.") + connection.send_msg_and_await_reception_status( + FRBCStorageStatus(message_id=uuid.uuid4(), present_fill_level=10.0) + ) + + print("Also send the actuator status.") + connection.send_msg_and_await_reception_status( + FRBCActuatorStatus( + message_id=uuid.uuid4(), + actuator_id=actuator_id, + active_operation_mode_id=operation_mode_id, + operation_mode_factor=0.5, + ) + ) + + def deactivate(self, connection: S2SyncConnection) -> None: + print("The control type FRBC is now deactivated.") + + +class MyNoControlControlType(NoControlControlType): + def activate(self, connection: S2SyncConnection) -> None: + print("The control type NoControl is now activated.") + + def deactivate(self, connection: S2SyncConnection) -> None: + print("The control type NoControl is now deactivated.") + + +def stop(s2_connection, signal_num, _current_stack_frame): + print(f"Received signal {signal_num}. Will stop S2 connection.") + s2_connection.stop() + + +def start_s2_session(url, client_node_id: uuid.UUID): + # Configure a resource manager + rm_handler = ResourceManagerHandler( + asset_details=AssetDetails( + resource_id=client_node_id, + name="Some asset", + instruction_processing_delay=Duration.from_milliseconds(20), + roles=[Role(role=RoleType.ENERGY_CONSUMER, commodity=Commodity.ELECTRICITY)], + currency=Currency.EUR, + provides_forecast=False, + provides_power_measurements=[CommodityQuantity.ELECTRIC_POWER_L1], + ), + control_types=[MyFRBCControlType(), MyNoControlControlType()] + ) + + # Setup the underlying websocket connection + ws_medium = WebsocketClientMedium(url=url, verify_certificate=False) + + eventloop = asyncio.get_event_loop() + eventloop.run_until_complete(ws_medium.connect()) + + # Configure the S2 connection on top of the websocket connection + s2_conn = S2SyncConnection(medium=ws_medium) + rm_handler.register_handlers(s2_conn) + s2_conn.start() + + stop_event = threading.Event() + + def stop(signal_num, _current_stack_frame): + print(f"Received signal {signal_num}. Will stop S2 connection.") + stop_event.set() + + signal.signal(signal.SIGINT, stop) + signal.signal(signal.SIGTERM, stop) + stop_event.wait() + + s2_conn.stop() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="A simple S2 reseource manager example.") + client_node_id = uuid.uuid4() + parser.add_argument( + "--endpoint", + type=str, + required=False, + help=f"WebSocket endpoint uri for the server (CEM) e.g. ws://localhost:8000/ws/{client_node_id}", + default=f"ws://localhost:8000/ws/{client_node_id}", + ) + args = parser.parse_args() + + start_s2_session(args.endpoint, client_node_id) diff --git a/src/s2python/connection/__init__.py b/src/s2python/connection/__init__.py index ad9fbdd..85d06ec 100644 --- a/src/s2python/connection/__init__.py +++ b/src/s2python/connection/__init__.py @@ -2,3 +2,4 @@ from s2python.connection.connection_events import ConnectionStarted, ConnectionStopped, S2ConnectionEvent from s2python.connection.control_type import RoleHandler from s2python.connection.types import S2ConnectionEventsAndMessages + diff --git a/src/s2python/connection/asset_details.py b/src/s2python/connection/asset_details.py index 880f0a3..29a2451 100644 --- a/src/s2python/connection/asset_details.py +++ b/src/s2python/connection/asset_details.py @@ -1,3 +1,4 @@ +import typing import uuid from dataclasses import dataclass from typing import Optional, List @@ -9,8 +10,15 @@ Duration, Currency, ) -from s2python.generated.gen_s2 import CommodityQuantity -from s2python.s2_control_type import S2ControlType +from s2python.common import CommodityQuantity, ControlType + +if typing.TYPE_CHECKING: + from s2python.connection.async_.control_type.class_based import S2ControlType + + +class HasProtocolControlType(typing.Protocol): + def get_protocol_control_type(self) -> ControlType: + ... @dataclass class AssetDetails: # pylint: disable=too-many-instance-attributes @@ -30,7 +38,7 @@ class AssetDetails: # pylint: disable=too-many-instance-attributes serial_number: Optional[str] = None def to_resource_manager_details( - self, control_types: List[S2ControlType] + self, control_types: typing.Sequence[HasProtocolControlType] ) -> ResourceManagerDetails: return ResourceManagerDetails( available_control_types=[ diff --git a/src/s2python/connection/async_/connection.py b/src/s2python/connection/async_/connection.py index e540c99..eb0f6d3 100644 --- a/src/s2python/connection/async_/connection.py +++ b/src/s2python/connection/async_/connection.py @@ -1,12 +1,11 @@ -from websockets import ConnectionClosed -from s2python.connection.connection_events import ConnectionStarted, ConnectionStopped -from s2python.connection.medium.s2_medium import S2MediumConnectionAsync, MediumClosedConnectionError +from s2python.connection.connection_events import ConnectionStopped +from s2python.connection.async_.medium.s2_medium import S2MediumConnection, MediumClosedConnectionError import asyncio import json import logging import uuid -from typing import Coroutine, Optional, List, Type, Any, Callable +from typing import Optional, Type from s2python.common import ( ReceptionStatusValues, @@ -15,7 +14,6 @@ from s2python.connection.async_.message_handlers import MessageHandlers, S2EventHandlerAsync from s2python.connection.types import S2ConnectionEventsAndMessages from s2python.reception_status_awaiter import ReceptionStatusAwaiter -from s2python.s2_control_type import S2ControlType from s2python.s2_parser import S2Parser from s2python.s2_validation_error import S2ValidationError from s2python.message import S2Message, S2MessageWithID @@ -37,13 +35,13 @@ class S2AsyncConnection: # pylint: disable=too-many-instance-attributes _received_messages: asyncio.Queue _reception_status_awaiter: ReceptionStatusAwaiter - _medium: S2MediumConnectionAsync + _medium: S2MediumConnection _s2_parser: S2Parser _handlers: MessageHandlers def __init__( # pylint: disable=too-many-arguments self, - medium: S2MediumConnectionAsync, + medium: S2MediumConnection, eventloop: Optional[asyncio.AbstractEventLoop] = None, ) -> None: self._eventloop = eventloop if eventloop is not None else asyncio.get_event_loop() @@ -78,7 +76,7 @@ async def _wait_till_stop(self) -> None: async def _run(self) -> None: self._received_messages = asyncio.Queue() - if not self._medium.is_connected(): + if not await self._medium.is_connected(): raise MediumClosedConnectionError("Cannot start the S2 connection if the underlying medium is closed.") background_tasks = [ @@ -115,6 +113,7 @@ async def _run(self) -> None: async def _handle_received_messages(self) -> None: while not self._stop_event.is_set(): msg = await self._received_messages.get() + logger.debug('Handling received message %s', msg.to_json()) await self._handlers.handle_event(self, msg) async def _receive_messages(self) -> None: @@ -125,7 +124,7 @@ async def _receive_messages(self) -> None: """ logger.info("S2 connection has started to receive messages.") - async for message in await self._medium.messages(): + async for message in self._medium.messages(): try: s2_msg: S2Message = self._s2_parser.parse_as_any_message(message) except json.JSONDecodeError: @@ -161,6 +160,7 @@ async def _receive_messages(self) -> None: ) await self._reception_status_awaiter.receive_reception_status(s2_msg) else: + logger.debug('Message is not a reception status, putting it in the received messages queue.') await self._received_messages.put(s2_msg) def register_handler(self, event_type: Type[S2ConnectionEventsAndMessages], handler: S2EventHandlerAsync) -> None: diff --git a/src/s2python/connection/async_/control_type/class_based.py b/src/s2python/connection/async_/control_type/class_based.py index 51776c3..1781e03 100644 --- a/src/s2python/connection/async_/control_type/class_based.py +++ b/src/s2python/connection/async_/control_type/class_based.py @@ -11,10 +11,10 @@ SelectControlType, ) from s2python.connection.async_.connection import S2AsyncConnection -from s2python.connection.types import S2ConnectionEvent +from s2python.connection.types import S2ConnectionEvent, S2ConnectionEventsAndMessages from s2python.version import S2_VERSION -from s2python.connection.connection_events import ConnectionStarted +from s2python.connection.connection_events import ConnectionStarted, ConnectionStopped from s2python.common import ControlType as ProtocolControlType from s2python.frbc import FRBCInstruction @@ -59,6 +59,7 @@ def register_handlers(self, connection: S2AsyncConnection) -> None: connection.register_handler(Handshake, self._on_handshake) connection.register_handler(HandshakeResponse, self._on_handshake_response) connection.register_handler(SelectControlType, self._on_select_control_type) + connection.register_handler(ConnectionStopped, self._on_connection_stop) async def _on_connection_started(self, connection: S2AsyncConnection, _: S2ConnectionEvent, __: Optional[Coroutine[Any, Any, None]]) -> None: await connection.send_msg_and_await_reception_status( @@ -159,7 +160,7 @@ def register_handlers(self, connection: S2AsyncConnection) -> None: @abc.abstractmethod async def handle_instruction( - self, conn: S2AsyncConnection, msg: S2Message, send_okay: Callable[[], None] + self, connection: S2AsyncConnection, msg: S2ConnectionEventsAndMessages, send_okay: Optional[Coroutine[Any, Any, None]] ) -> None: ... @abc.abstractmethod @@ -180,7 +181,7 @@ def register_handlers(self, connection: S2AsyncConnection) -> None: @abc.abstractmethod async def handle_instruction( - self, connection: S2AsyncConnection, msg: S2Message, send_okay: Callable[[], None] + self, connection: S2AsyncConnection, msg: S2ConnectionEventsAndMessages, send_okay: Optional[Coroutine[Any, Any, None]] ) -> None: ... @abc.abstractmethod @@ -201,7 +202,7 @@ def register_handlers(self, connection: S2AsyncConnection) -> None: @abc.abstractmethod async def handle_instruction( - self, connection: S2AsyncConnection, msg: S2Message, send_okay: Callable[[], None] + self, connection: S2AsyncConnection, msg: S2ConnectionEventsAndMessages, send_okay: Optional[Coroutine[Any, Any, None]] ) -> None: ... @abc.abstractmethod diff --git a/src/s2python/connection/sync/control_type/control_types.py b/src/s2python/connection/async_/medium/__init__.py similarity index 100% rename from src/s2python/connection/sync/control_type/control_types.py rename to src/s2python/connection/async_/medium/__init__.py diff --git a/src/s2python/connection/async_/message_handlers.py b/src/s2python/connection/async_/message_handlers.py index e27b995..ee671a3 100644 --- a/src/s2python/connection/async_/message_handlers.py +++ b/src/s2python/connection/async_/message_handlers.py @@ -1,10 +1,10 @@ -from asyncio.events import AbstractEventLoop -from s2python.connection.async_.connection import S2AsyncConnection - import asyncio import logging import uuid -from typing import Any, Coroutine, Literal, Optional, List, Type, Dict, Callable, Awaitable, Union +from typing import Any, Coroutine, Optional, Type, Dict, Callable, TYPE_CHECKING + +if TYPE_CHECKING: + from s2python.connection.async_.connection import S2AsyncConnection from s2python.common import ReceptionStatusValues from s2python.connection.types import S2ConnectionEvent, S2ConnectionEventsAndMessages @@ -13,7 +13,7 @@ logger = logging.getLogger("s2python") -S2EventHandlerAsync = Callable[[S2AsyncConnection, S2ConnectionEvent, Optional[Coroutine[Any, Any, None]]], Coroutine[Any, Any, None]] +S2EventHandlerAsync = Callable[["S2AsyncConnection", S2ConnectionEvent, Optional[Coroutine[Any, Any, None]]], Coroutine[Any, Any, None]] class SendOkay: _status_is_send: asyncio.Event @@ -58,7 +58,7 @@ class MessageHandlers: def __init__(self) -> None: self.handlers = {} - async def handle_event(self, connection: S2AsyncConnection, event: S2ConnectionEventsAndMessages) -> None: + async def handle_event(self, connection: "S2AsyncConnection", event: S2ConnectionEventsAndMessages) -> None: """Handle the S2 message using the registered handler. :param connection: The S2 conncetion the `msg` is received from. @@ -68,10 +68,12 @@ async def handle_event(self, connection: S2AsyncConnection, event: S2ConnectionE if handler is not None: send_okay = None try: - if isinstance(event, S2MessageWithID): + if hasattr(event, "message_id"): + logger.debug('Handling S2 message with message id %s using handler %s', event.message_id, handler) send_okay = SendOkay(connection, event.message_id) await handler(connection, event, send_okay.run()) else: + logger.debug('Handling S2 connection event (without message id) using handler %s', handler) await handler(connection, event, None) except Exception: if send_okay and not send_okay._status_is_send.is_set(): @@ -81,7 +83,7 @@ async def handle_event(self, connection: S2AsyncConnection, event: S2ConnectionE diagnostic_label=f"While processing message {event.message_id} " f"an unrecoverable error occurred.", ) - raise + raise if send_okay: await send_okay.ensure_send(type(event)) else: @@ -91,7 +93,7 @@ async def handle_event(self, connection: S2AsyncConnection, event: S2ConnectionE ) def register_handler( - self, event_type: Type[S2ConnectionEvent], handler: S2MessageHandlerAsync + self, event_type: Type[S2ConnectionEvent], handler: S2EventHandlerAsync ) -> None: """Register a coroutine function or a normal function as the handler for a specific S2 message type. diff --git a/src/s2python/connection/sync/__init__.py b/src/s2python/connection/sync/__init__.py new file mode 100644 index 0000000..5d24e91 --- /dev/null +++ b/src/s2python/connection/sync/__init__.py @@ -0,0 +1,2 @@ +from s2python.connection.sync.connection import S2SyncConnection +from s2python.connection.sync.connection import S2EventHandlerSync \ No newline at end of file diff --git a/src/s2python/connection/sync/connection.py b/src/s2python/connection/sync/connection.py index a762ef8..30953fb 100644 --- a/src/s2python/connection/sync/connection.py +++ b/src/s2python/connection/sync/connection.py @@ -2,20 +2,17 @@ import logging import threading import uuid -from typing import Any, Coroutine, Optional, List, Type, Callable +from typing import Any, Coroutine, Optional, Type, Callable from s2python.common import ( ReceptionStatusValues, - EnergyManagementRole, ) -from s2python.connection.asset_details import AssetDetails -from s2python.connection.types import S2MessageHandlerSync, S2ConnectionEvent -from s2python.s2_control_type import S2ControlType +from s2python.connection.types import S2ConnectionEvent, S2ConnectionEventsAndMessages from s2python.message import S2Message from s2python.common import ReceptionStatus -from s2python.connection.medium.s2_medium import S2MediumConnectionAsync -from s2python.connection.s2_async_connection import S2AsyncConnection +from s2python.connection.async_.medium.s2_medium import S2MediumConnection +from s2python.connection.async_ import S2AsyncConnection from s2python.message import S2MessageWithID logger = logging.getLogger("s2python") @@ -23,7 +20,6 @@ S2EventHandlerSync = Callable[["S2SyncConnection", S2ConnectionEvent, Optional[Callable[[], None]]], None] - class S2SyncConnection: _thread: threading.Thread _eventloop: asyncio.AbstractEventLoop @@ -31,20 +27,17 @@ class S2SyncConnection: def __init__( # pylint: disable=too-many-arguments self, - role: EnergyManagementRole, - control_types: List[S2ControlType], - asset_details: AssetDetails, - medium: S2MediumConnectionAsync, + medium: S2MediumConnection, eventloop: Optional[asyncio.AbstractEventLoop] = None, ) -> None: self._thread = threading.Thread(target=self._run_eventloop) self._eventloop = asyncio.new_event_loop() if eventloop is None else eventloop - self._async_s2_connection = S2AsyncConnection(role, control_types, asset_details, medium, self._eventloop) + self._async_s2_connection = S2AsyncConnection(medium, self._eventloop) - def start_as_rm(self) -> None: + def start(self) -> None: self._thread.start() asyncio.run_coroutine_threadsafe( - self._async_s2_connection.start_as_rm(), + self._async_s2_connection.start(), self._eventloop, ).result() @@ -70,7 +63,7 @@ def stop(self) -> None: self._thread.join() logger.info("Stopped the S2 connection.") - def register_handler(self, s2_message_type: Type[S2MessageWithID], handler: S2MessageHandlerSync) -> None: + def register_handler(self, s2_message_type: Type[S2ConnectionEventsAndMessages], handler: S2EventHandlerSync) -> None: """Register a handler for a specific S2 message type. :param s2_message_type: The S2 message type to register the handler for. @@ -80,14 +73,14 @@ def register_handler(self, s2_message_type: Type[S2MessageWithID], handler: S2Me async def handle_s2_message_async_wrapper( _: S2AsyncConnection, s2_msg: S2ConnectionEvent, - send_okay: Coroutine[Any, Any, None], + send_okay: Optional[Coroutine[Any, Any, None]], ) -> None: await self._eventloop.run_in_executor( None, handler, self, s2_msg, - lambda: asyncio.run_coroutine_threadsafe(send_okay, self._eventloop).result(), + lambda: asyncio.run_coroutine_threadsafe(send_okay, self._eventloop).result() if send_okay else None, ) self._async_s2_connection.register_handler(s2_message_type, handle_s2_message_async_wrapper) diff --git a/src/s2python/connection/sync/control_type/class_based.py b/src/s2python/connection/sync/control_type/class_based.py new file mode 100644 index 0000000..8f71be5 --- /dev/null +++ b/src/s2python/connection/sync/control_type/class_based.py @@ -0,0 +1,241 @@ +import abc +import logging +import uuid +from typing import Optional, List, Callable + +from s2python.connection.asset_details import AssetDetails +from s2python.common import ( + Handshake, + EnergyManagementRole, + HandshakeResponse, + SelectControlType, +) +from s2python.connection.sync.connection import S2SyncConnection +from s2python.connection.types import S2ConnectionEvent, S2ConnectionEventsAndMessages +from s2python.version import S2_VERSION + +from s2python.connection.connection_events import ConnectionStarted, ConnectionStopped + +from s2python.common import ControlType as ProtocolControlType +from s2python.frbc import FRBCInstruction +from s2python.ppbc import PPBCScheduleInstruction +from s2python.ombc import OMBCInstruction + +logger = logging.getLogger("s2python") + + +class S2ControlType(abc.ABC): + @abc.abstractmethod + def get_protocol_control_type(self) -> ProtocolControlType: ... + + @abc.abstractmethod + def register_handlers(self, connection: S2SyncConnection) -> None: ... + + @abc.abstractmethod + def activate(self, connection: S2SyncConnection) -> None: ... + + @abc.abstractmethod + def deactivate(self, connection: S2SyncConnection) -> None: ... + + + +class ResourceManagerHandler: + asset_details: AssetDetails + _current_control_type: Optional[S2ControlType] + _control_types: List[S2ControlType] + + def __init__(self, control_types: List[S2ControlType], + asset_details: AssetDetails) -> None: + self.asset_details = asset_details + self._current_control_type = None + self._control_types = control_types + + def get_s2_role(self) -> EnergyManagementRole: + return EnergyManagementRole.RM + + def register_handlers(self, connection: S2SyncConnection) -> None: + connection.register_handler(ConnectionStarted, self._on_connection_started) + connection.register_handler(Handshake, self._on_handshake) + connection.register_handler(HandshakeResponse, self._on_handshake_response) + connection.register_handler(SelectControlType, self._on_select_control_type) + connection.register_handler(ConnectionStopped, self._on_connection_stop) + + def _on_connection_started(self, connection: S2SyncConnection, _: S2ConnectionEvent, __: Optional[Callable[[], None]]) -> None: + connection.send_msg_and_await_reception_status( + Handshake( + message_id=uuid.uuid4(), + role=self.get_s2_role(), + supported_protocol_versions=[S2_VERSION], + ) + ) + logger.debug( + "Send handshake to CEM. Expecting Handshake and HandshakeResponse from CEM." + ) + + def _on_handshake( + self, _: S2SyncConnection, event: S2ConnectionEvent, send_okay: Optional[Callable[[], None]] + ) -> None: + assert send_okay is not None + if not isinstance(event, Handshake): + logger.error( + "Handler for Handshake received a message of the wrong type: %s", + type(event), + ) + return + + logger.debug( + "%s supports S2 protocol versions: %s", + event.role, + event.supported_protocol_versions, + ) + send_okay() + + def _on_handshake_response( + self, connection: S2SyncConnection, event: S2ConnectionEvent, send_okay: Optional[Callable[[], None]] + ) -> None: + assert send_okay is not None + if not isinstance(event, HandshakeResponse): + logger.error( + "Handler for HandshakeResponse received a message of the wrong type: %s", + type(event), + ) + return + + logger.debug("Received HandshakeResponse %s", event.to_json()) + logger.debug( + "CEM selected to use version %s", event.selected_protocol_version + ) + send_okay() + logger.debug("Handshake complete. Sending first ResourceManagerDetails.") + + connection.send_msg_and_await_reception_status( + self.asset_details.to_resource_manager_details(self._control_types) + ) + + def _on_select_control_type( + self, connection: S2SyncConnection, event: S2ConnectionEvent, send_okay: Optional[Callable[[], None]] + ) -> None: + assert send_okay is not None + if not isinstance(event, SelectControlType): + logger.error( + "Handler for SelectControlType received a message of the wrong type: %s", + type(event), + ) + return + + send_okay + + logger.debug( + "CEM selected control type %s. Activating control type.", + event.control_type, + ) + + control_types_by_protocol_name = { + c.get_protocol_control_type(): c for c in self._control_types + } + selected_control_type = control_types_by_protocol_name.get(event.control_type) + + if self._current_control_type is not None: + self._current_control_type.deactivate(connection) + + self._current_control_type = selected_control_type + + if self._current_control_type is not None: + self._current_control_type.register_handlers(connection) + self._current_control_type.activate(connection) + + def _on_connection_stop(self, connection: S2SyncConnection, __: S2ConnectionEvent, ___: Optional[Callable[[], None]]): + if self._current_control_type: + self._current_control_type.deactivate(connection) + self._current_control_type = None + + +class FRBCControlType(S2ControlType): + def get_protocol_control_type(self) -> ProtocolControlType: + return ProtocolControlType.FILL_RATE_BASED_CONTROL + + def register_handlers(self, connection: S2SyncConnection) -> None: + connection.register_handler(FRBCInstruction, self.handle_instruction) + + @abc.abstractmethod + def handle_instruction( + self, connection: S2SyncConnection, msg: S2ConnectionEventsAndMessages, send_okay: Optional[Callable[[], None]] + ) -> None: ... + + @abc.abstractmethod + def activate(self, connection: S2SyncConnection) -> None: + """Overwrite with the actual dctivation logic of your Resource Manager for this particular control type.""" + + @abc.abstractmethod + def deactivate(self, connection: S2SyncConnection) -> None: + """Overwrite with the actual deactivation logic of your Resource Manager for this particular control type.""" + + +class PPBCControlType(S2ControlType): + def get_protocol_control_type(self) -> ProtocolControlType: + return ProtocolControlType.POWER_PROFILE_BASED_CONTROL + + def register_handlers(self, connection: S2SyncConnection) -> None: + connection.register_handler(PPBCScheduleInstruction, self.handle_instruction) + + @abc.abstractmethod + def handle_instruction( + self, connection: S2SyncConnection, msg: S2ConnectionEventsAndMessages, send_okay: Optional[Callable[[], None]] + ) -> None: ... + + @abc.abstractmethod + def activate(self, connection: S2SyncConnection) -> None: + """Overwrite with the actual dctivation logic of your Resource Manager for this particular control type.""" + + @abc.abstractmethod + def deactivate(self, connection: S2SyncConnection) -> None: + """Overwrite with the actual deactivation logic of your Resource Manager for this particular control type.""" + + +class OMBCControlType(S2ControlType): + def get_protocol_control_type(self) -> ProtocolControlType: + return ProtocolControlType.OPERATION_MODE_BASED_CONTROL + + def register_handlers(self, connection: S2SyncConnection) -> None: + connection.register_handler(OMBCInstruction, self.handle_instruction) + + @abc.abstractmethod + def handle_instruction( + self, connection: S2SyncConnection, msg: S2ConnectionEventsAndMessages, send_okay: Optional[Callable[[], None]] + ) -> None: ... + + @abc.abstractmethod + def activate(self, connection: S2SyncConnection) -> None: + """Overwrite with the actual dctivation logic of your Resource Manager for this particular control type.""" + + @abc.abstractmethod + def deactivate(self, connection: S2SyncConnection) -> None: + """Overwrite with the actual deactivation logic of your Resource Manager for this particular control type.""" + + +class PEBCControlType(S2ControlType): + def get_protocol_control_type(self) -> ProtocolControlType: + return ProtocolControlType.POWER_ENVELOPE_BASED_CONTROL + + def register_handlers(self, connection: S2SyncConnection) -> None: + pass + + @abc.abstractmethod + def activate(self, connection: S2SyncConnection) -> None: ... + + @abc.abstractmethod + def deactivate(self, connection: S2SyncConnection) -> None: ... + + +class NoControlControlType(S2ControlType): + def get_protocol_control_type(self) -> ProtocolControlType: + return ProtocolControlType.NOT_CONTROLABLE + + def register_handlers(self, connection: S2SyncConnection) -> None: + pass + + @abc.abstractmethod + def activate(self, connection: S2SyncConnection) -> None: ... + + @abc.abstractmethod + def deactivate(self, connection: S2SyncConnection) -> None: ...