diff --git a/examples/async_frbc_rm.py b/examples/async_frbc_rm.py new file mode 100644 index 0000000..2046e02 --- /dev/null +++ b/examples/async_frbc_rm.py @@ -0,0 +1,207 @@ +import argparse +import asyncio +from functools import partial +import logging +import sys +import uuid +import signal +import datetime +from typing import Callable, Optional, Coroutine, Any + +from s2python.connection.types import S2ConnectionEventsAndMessages +from s2python.common import ( + Duration, + Role, + RoleType, + Commodity, + Currency, + NumberRange, + PowerRange, + CommodityQuantity, +) +from s2python.frbc import ( + FRBCInstruction, + FRBCSystemDescription, + FRBCActuatorDescription, + FRBCStorageDescription, + FRBCOperationMode, + FRBCOperationModeElement, + FRBCFillLevelTargetProfile, + FRBCFillLevelTargetProfileElement, + FRBCStorageStatus, + FRBCActuatorStatus, +) +from s2python.connection import AssetDetails +from s2python.connection.async_ import S2AsyncConnection +from s2python.connection.async_.medium.websocket import WebsocketClientMedium +from s2python.connection.async_.control_type.class_based import FRBCControlType, NoControlControlType, ResourceManagerHandler + +logger = logging.getLogger("s2python") +logger.addHandler(logging.StreamHandler(sys.stdout)) +logger.setLevel(logging.DEBUG) + + +class MyFRBCControlType(FRBCControlType): + async def handle_instruction( + self, connection: S2AsyncConnection, msg: S2ConnectionEventsAndMessages, send_okay: Optional[Coroutine[Any, Any, None]] + ) -> None: + if not isinstance(msg, FRBCInstruction): + raise RuntimeError( + f"Expected an FRBCInstruction but received a message of type {type(msg)}." + ) + print(f"I have received the message {msg} from {connection}") + + async def activate(self, connection: S2AsyncConnection) -> None: + print("The control type FRBC is now activated.") + + print("Time to send a FRBC SystemDescription") + actuator_id = uuid.uuid4() + operation_mode_id = uuid.uuid4() + await connection.send_msg_and_await_reception_status( + FRBCSystemDescription( + message_id=uuid.uuid4(), + valid_from=datetime.datetime.now(tz=datetime.timezone.utc), + actuators=[ + FRBCActuatorDescription( + id=actuator_id, + operation_modes=[ + FRBCOperationMode( + id=operation_mode_id, + elements=[ + FRBCOperationModeElement( + fill_level_range=NumberRange( + start_of_range=0.0, end_of_range=100.0 + ), + fill_rate=NumberRange( + start_of_range=-5.0, end_of_range=5.0 + ), + power_ranges=[ + PowerRange( + start_of_range=-200.0, + end_of_range=200.0, + commodity_quantity=CommodityQuantity.ELECTRIC_POWER_L1, + ) + ], + ) + ], + diagnostic_label="Load & unload battery", + abnormal_condition_only=False, + ) + ], + transitions=[], + timers=[], + supported_commodities=[Commodity.ELECTRICITY], + ) + ], + storage=FRBCStorageDescription( + fill_level_range=NumberRange(start_of_range=0.0, end_of_range=100.0), + fill_level_label="%", + diagnostic_label="Imaginary battery", + provides_fill_level_target_profile=True, + provides_leakage_behaviour=False, + provides_usage_forecast=False, + ), + ) + ) + print("Also send the target profile") + + await connection.send_msg_and_await_reception_status( + FRBCFillLevelTargetProfile( + message_id=uuid.uuid4(), + start_time=datetime.datetime.now(tz=datetime.timezone.utc), + elements=[ + FRBCFillLevelTargetProfileElement( + duration=Duration.from_milliseconds(30_000), + fill_level_range=NumberRange(start_of_range=20.0, end_of_range=30.0), + ), + FRBCFillLevelTargetProfileElement( + duration=Duration.from_milliseconds(300_000), + fill_level_range=NumberRange(start_of_range=40.0, end_of_range=50.0), + ), + ], + ) + ) + + print("Also send the storage status.") + await connection.send_msg_and_await_reception_status( + FRBCStorageStatus(message_id=uuid.uuid4(), present_fill_level=10.0) + ) + + print("Also send the actuator status.") + await connection.send_msg_and_await_reception_status( + FRBCActuatorStatus( + message_id=uuid.uuid4(), + actuator_id=actuator_id, + active_operation_mode_id=operation_mode_id, + operation_mode_factor=0.5, + ) + ) + + async def deactivate(self, connection: S2AsyncConnection) -> None: + print("The control type FRBC is now deactivated.") + + +class MyNoControlControlType(NoControlControlType): + async def activate(self, connection: S2AsyncConnection) -> None: + print("The control type NoControl is now activated.") + + async def deactivate(self, connection: S2AsyncConnection) -> None: + print("The control type NoControl is now deactivated.") + + +def stop(s2_connection, signal_num, _current_stack_frame): + print(f"Received signal {signal_num}. Will stop S2 connection.") + s2_connection.stop() + + +async def start_s2_session(url, client_node_id: uuid.UUID): + # Configure a resource manager + rm_handler = ResourceManagerHandler( + asset_details=AssetDetails( + resource_id=client_node_id, + name="Some asset", + instruction_processing_delay=Duration.from_milliseconds(20), + roles=[Role(role=RoleType.ENERGY_CONSUMER, commodity=Commodity.ELECTRICITY)], + currency=Currency.EUR, + provides_forecast=False, + provides_power_measurements=[CommodityQuantity.ELECTRIC_POWER_L1], + ), + control_types=[MyFRBCControlType(), MyNoControlControlType()] + ) + + # Setup the underlying websocket connection + ws_medium = WebsocketClientMedium(url=url, verify_certificate=False) + await ws_medium.connect() + + # Configure the S2 connection on top of the websocket connection + s2_conn = S2AsyncConnection(medium=ws_medium) + rm_handler.register_handlers(s2_conn) + await s2_conn.start() + + stop_event = asyncio.Event() + eventloop = asyncio.get_running_loop() + + async def stop(): + print(f"Received signal. Will stop S2 connection.") + stop_event.set() + + eventloop.add_signal_handler(signal.SIGINT, lambda: eventloop.create_task(stop())) + eventloop.add_signal_handler(signal.SIGTERM, lambda: eventloop.create_task(stop())) + await stop_event.wait() + + await s2_conn.stop() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="A simple S2 reseource manager example.") + client_node_id = uuid.uuid4() + parser.add_argument( + "--endpoint", + type=str, + required=False, + help=f"WebSocket endpoint uri for the server (CEM) e.g. ws://localhost:8000/ws/{client_node_id}", + default=f"ws://localhost:8000/ws/{client_node_id}", + ) + args = parser.parse_args() + + asyncio.run(start_s2_session(args.endpoint, client_node_id)) diff --git a/examples/example_frbc_rm.py b/examples/sync_frbc_rm.py similarity index 71% rename from examples/example_frbc_rm.py rename to examples/sync_frbc_rm.py index d69473d..136fdd7 100644 --- a/examples/example_frbc_rm.py +++ b/examples/sync_frbc_rm.py @@ -1,14 +1,14 @@ import argparse -from functools import partial +import asyncio +import threading import logging import sys import uuid import signal import datetime -from typing import Callable +from typing import Callable, Optional from s2python.common import ( - EnergyManagementRole, Duration, Role, RoleType, @@ -18,6 +18,7 @@ PowerRange, CommodityQuantity, ) +from s2python.connection.types import S2ConnectionEventsAndMessages from s2python.frbc import ( FRBCInstruction, FRBCSystemDescription, @@ -30,9 +31,10 @@ FRBCStorageStatus, FRBCActuatorStatus, ) -from s2python.s2_connection import S2Connection, AssetDetails -from s2python.s2_control_type import FRBCControlType, NoControlControlType -from s2python.message import S2Message +from s2python.connection import AssetDetails +from s2python.connection.sync import S2SyncConnection +from s2python.connection.async_.medium.websocket import WebsocketClientMedium +from s2python.connection.sync.control_type.class_based import FRBCControlType, NoControlControlType, ResourceManagerHandler logger = logging.getLogger("s2python") logger.addHandler(logging.StreamHandler(sys.stdout)) @@ -41,21 +43,21 @@ class MyFRBCControlType(FRBCControlType): def handle_instruction( - self, conn: S2Connection, msg: S2Message, send_okay: Callable[[], None] + self, connection: S2SyncConnection, msg: S2ConnectionEventsAndMessages, send_okay: Optional[Callable[[], None]] ) -> None: if not isinstance(msg, FRBCInstruction): raise RuntimeError( f"Expected an FRBCInstruction but received a message of type {type(msg)}." ) - print(f"I have received the message {msg} from {conn}") + print(f"I have received the message {msg} from {connection}") - def activate(self, conn: S2Connection) -> None: + def activate(self, connection: S2SyncConnection) -> None: print("The control type FRBC is now activated.") print("Time to send a FRBC SystemDescription") actuator_id = uuid.uuid4() operation_mode_id = uuid.uuid4() - conn.send_msg_and_await_reception_status_sync( + connection.send_msg_and_await_reception_status( FRBCSystemDescription( message_id=uuid.uuid4(), valid_from=datetime.datetime.now(tz=datetime.timezone.utc), @@ -103,7 +105,7 @@ def activate(self, conn: S2Connection) -> None: ) print("Also send the target profile") - conn.send_msg_and_await_reception_status_sync( + connection.send_msg_and_await_reception_status( FRBCFillLevelTargetProfile( message_id=uuid.uuid4(), start_time=datetime.datetime.now(tz=datetime.timezone.utc), @@ -121,12 +123,12 @@ def activate(self, conn: S2Connection) -> None: ) print("Also send the storage status.") - conn.send_msg_and_await_reception_status_sync( + connection.send_msg_and_await_reception_status( FRBCStorageStatus(message_id=uuid.uuid4(), present_fill_level=10.0) ) print("Also send the actuator status.") - conn.send_msg_and_await_reception_status_sync( + connection.send_msg_and_await_reception_status( FRBCActuatorStatus( message_id=uuid.uuid4(), actuator_id=actuator_id, @@ -135,15 +137,15 @@ def activate(self, conn: S2Connection) -> None: ) ) - def deactivate(self, conn: S2Connection) -> None: + def deactivate(self, connection: S2SyncConnection) -> None: print("The control type FRBC is now deactivated.") class MyNoControlControlType(NoControlControlType): - def activate(self, conn: S2Connection) -> None: + def activate(self, connection: S2SyncConnection) -> None: print("The control type NoControl is now activated.") - def deactivate(self, conn: S2Connection) -> None: + def deactivate(self, connection: S2SyncConnection) -> None: print("The control type NoControl is now deactivated.") @@ -152,11 +154,9 @@ def stop(s2_connection, signal_num, _current_stack_frame): s2_connection.stop() -def start_s2_session(url, client_node_id=str(uuid.uuid4())): - s2_conn = S2Connection( - url=url, - role=EnergyManagementRole.RM, - control_types=[MyFRBCControlType(), MyNoControlControlType()], +def start_s2_session(url, client_node_id: uuid.UUID): + # Configure a resource manager + rm_handler = ResourceManagerHandler( asset_details=AssetDetails( resource_id=client_node_id, name="Some asset", @@ -166,23 +166,43 @@ def start_s2_session(url, client_node_id=str(uuid.uuid4())): provides_forecast=False, provides_power_measurements=[CommodityQuantity.ELECTRIC_POWER_L1], ), - reconnect=True, - verify_certificate=False, + control_types=[MyFRBCControlType(), MyNoControlControlType()] ) - signal.signal(signal.SIGINT, partial(stop, s2_conn)) - signal.signal(signal.SIGTERM, partial(stop, s2_conn)) - s2_conn.start_as_rm() + # Setup the underlying websocket connection + ws_medium = WebsocketClientMedium(url=url, verify_certificate=False) + + eventloop = asyncio.get_event_loop() + eventloop.run_until_complete(ws_medium.connect()) + + # Configure the S2 connection on top of the websocket connection + s2_conn = S2SyncConnection(medium=ws_medium) + rm_handler.register_handlers(s2_conn) + s2_conn.start() + + stop_event = threading.Event() + + def stop(signal_num, _current_stack_frame): + print(f"Received signal {signal_num}. Will stop S2 connection.") + stop_event.set() + + signal.signal(signal.SIGINT, stop) + signal.signal(signal.SIGTERM, stop) + stop_event.wait() + + s2_conn.stop() if __name__ == "__main__": parser = argparse.ArgumentParser(description="A simple S2 reseource manager example.") + client_node_id = uuid.uuid4() parser.add_argument( - "endpoint", + "--endpoint", type=str, - help="WebSocket endpoint uri for the server (CEM) e.g. " - "ws://localhost:8080/backend/rm/s2python-frbc/cem/dummy_model/ws", + required=False, + help=f"WebSocket endpoint uri for the server (CEM) e.g. ws://localhost:8000/ws/{client_node_id}", + default=f"ws://localhost:8000/ws/{client_node_id}", ) args = parser.parse_args() - start_s2_session(args.endpoint) + start_s2_session(args.endpoint, client_node_id) diff --git a/src/s2python/connection/__init__.py b/src/s2python/connection/__init__.py new file mode 100644 index 0000000..85d06ec --- /dev/null +++ b/src/s2python/connection/__init__.py @@ -0,0 +1,5 @@ +from s2python.connection.asset_details import AssetDetails +from s2python.connection.connection_events import ConnectionStarted, ConnectionStopped, S2ConnectionEvent +from s2python.connection.control_type import RoleHandler +from s2python.connection.types import S2ConnectionEventsAndMessages + diff --git a/src/s2python/connection/asset_details.py b/src/s2python/connection/asset_details.py new file mode 100644 index 0000000..29a2451 --- /dev/null +++ b/src/s2python/connection/asset_details.py @@ -0,0 +1,60 @@ +import typing +import uuid +from dataclasses import dataclass +from typing import Optional, List + + +from s2python.common import ( + Role, + ResourceManagerDetails, + Duration, + Currency, +) +from s2python.common import CommodityQuantity, ControlType + +if typing.TYPE_CHECKING: + from s2python.connection.async_.control_type.class_based import S2ControlType + + +class HasProtocolControlType(typing.Protocol): + def get_protocol_control_type(self) -> ControlType: + ... + +@dataclass +class AssetDetails: # pylint: disable=too-many-instance-attributes + resource_id: uuid.UUID + + provides_forecast: bool + provides_power_measurements: List[CommodityQuantity] + + instruction_processing_delay: Duration + roles: List[Role] + currency: Optional[Currency] = None + + name: Optional[str] = None + manufacturer: Optional[str] = None + model: Optional[str] = None + firmware_version: Optional[str] = None + serial_number: Optional[str] = None + + def to_resource_manager_details( + self, control_types: typing.Sequence[HasProtocolControlType] + ) -> ResourceManagerDetails: + return ResourceManagerDetails( + available_control_types=[ + control_type.get_protocol_control_type() + for control_type in control_types + ], + currency=self.currency, + firmware_version=self.firmware_version, + instruction_processing_delay=self.instruction_processing_delay, + manufacturer=self.manufacturer, + message_id=uuid.uuid4(), + model=self.model, + name=self.name, + provides_forecast=self.provides_forecast, + provides_power_measurement_types=self.provides_power_measurements, + resource_id=self.resource_id, + roles=self.roles, + serial_number=self.serial_number, + ) diff --git a/src/s2python/connection/async_/__init__.py b/src/s2python/connection/async_/__init__.py new file mode 100644 index 0000000..52ca6dc --- /dev/null +++ b/src/s2python/connection/async_/__init__.py @@ -0,0 +1,13 @@ +from s2python.connection.async_.connection import S2AsyncConnection, CouldNotReceiveStatusReceptionError +from s2python.connection.async_.message_handlers import S2EventHandlerAsync +from s2python.connection.async_.medium.s2_medium import S2MediumConnection +from s2python.connection.async_.medium.websocket import WebsocketClientMedium + + +__all__ = [ + "S2AsyncConnection", + "CouldNotReceiveStatusReceptionError", + "S2EventHandlerAsync", + "S2MediumConnection", + "WebsocketClientMedium" +] diff --git a/src/s2python/connection/async_/connection.py b/src/s2python/connection/async_/connection.py new file mode 100644 index 0000000..eb0f6d3 --- /dev/null +++ b/src/s2python/connection/async_/connection.py @@ -0,0 +1,241 @@ +from s2python.connection.connection_events import ConnectionStopped +from s2python.connection.async_.medium.s2_medium import S2MediumConnection, MediumClosedConnectionError + +import asyncio +import json +import logging +import uuid +from typing import Optional, Type + +from s2python.common import ( + ReceptionStatusValues, + ReceptionStatus, +) +from s2python.connection.async_.message_handlers import MessageHandlers, S2EventHandlerAsync +from s2python.connection.types import S2ConnectionEventsAndMessages +from s2python.reception_status_awaiter import ReceptionStatusAwaiter +from s2python.s2_parser import S2Parser +from s2python.s2_validation_error import S2ValidationError +from s2python.message import S2Message, S2MessageWithID +from s2python.connection.connection_events import ConnectionStarted + +logger = logging.getLogger("s2python") + + + +class CouldNotReceiveStatusReceptionError(Exception): + ... + + +class S2AsyncConnection: # pylint: disable=too-many-instance-attributes + _eventloop: asyncio.AbstractEventLoop + _main_task: Optional[asyncio.Task] + _stop_event: asyncio.Event + """Stop the S2 connection permanently.""" + _received_messages: asyncio.Queue + + _reception_status_awaiter: ReceptionStatusAwaiter + _medium: S2MediumConnection + _s2_parser: S2Parser + _handlers: MessageHandlers + + def __init__( # pylint: disable=too-many-arguments + self, + medium: S2MediumConnection, + eventloop: Optional[asyncio.AbstractEventLoop] = None, + ) -> None: + self._eventloop = eventloop if eventloop is not None else asyncio.get_event_loop() + self._main_task = None + self._stop_event = asyncio.Event() + + self._reception_status_awaiter = ReceptionStatusAwaiter() + self._medium = medium + self._s2_parser = S2Parser() + self._handlers = MessageHandlers() + + async def start(self) -> None: + """Start this connection with the given S2 role such as resource manager or CEM and connect to the other party.""" + logger.debug('Starting S2 connection as %s.',) + + self._main_task = self._eventloop.create_task(self._run()) + + async def stop(self) -> None: + """Stop the S2 connection gracefully and wait till it stops. + + Note: Not thread-safe. Must be run from the same event loop as `start_as_rm` runs in. + Does not stop the underlying medium! + """ + logger.info("Will stop the S2 connection.") + self._stop_event.set() + if self._main_task is not None: + await self._main_task + + async def _wait_till_stop(self) -> None: + await self._stop_event.wait() + + async def _run(self) -> None: + self._received_messages = asyncio.Queue() + + if not await self._medium.is_connected(): + raise MediumClosedConnectionError("Cannot start the S2 connection if the underlying medium is closed.") + + background_tasks = [ + self._eventloop.create_task(self._receive_messages()), + self._eventloop.create_task(self._wait_till_stop()), + self._eventloop.create_task(self._handle_received_messages()), + ] + + await self._handlers.handle_event(self, ConnectionStarted()) + + (done, pending) = await asyncio.wait( + background_tasks, return_when=asyncio.FIRST_COMPLETED + ) + + await self._handlers.handle_event(self, ConnectionStopped()) + + for task in done: + try: + await task + except asyncio.CancelledError: + pass + except MediumClosedConnectionError: + logger.info("The other party closed the websocket connection.") + except Exception: + logger.exception("An error occurred in the S2 connection. Terminating current connection.") + + for task in pending: + try: + task.cancel() + await task + except (asyncio.CancelledError, Exception): + pass + + async def _handle_received_messages(self) -> None: + while not self._stop_event.is_set(): + msg = await self._received_messages.get() + logger.debug('Handling received message %s', msg.to_json()) + await self._handlers.handle_event(self, msg) + + async def _receive_messages(self) -> None: + """Receives all incoming messages in the form of a generator. + + Will also receive the ReceptionStatus messages but instead of yielding these messages, they are routed + to any calls of `send_msg_and_await_reception_status`. + """ + logger.info("S2 connection has started to receive messages.") + + async for message in self._medium.messages(): + try: + s2_msg: S2Message = self._s2_parser.parse_as_any_message(message) + except json.JSONDecodeError: + await self.send_and_forget( + ReceptionStatus( + subject_message_id=uuid.UUID("00000000-0000-0000-0000-000000000000"), + status=ReceptionStatusValues.INVALID_DATA, + diagnostic_label="Not valid json.", + ) + ) + except S2ValidationError as e: + json_msg = json.loads(message) + message_id = json_msg.get("message_id") + if message_id: + await self.respond_with_reception_status( + subject_message_id=message_id, + status=ReceptionStatusValues.INVALID_MESSAGE, + diagnostic_label=str(e), + ) + else: + await self.respond_with_reception_status( + subject_message_id=uuid.UUID("00000000-0000-0000-0000-000000000000"), + status=ReceptionStatusValues.INVALID_DATA, + diagnostic_label="Message appears valid json but could not find a message_id field.", + ) + else: + logger.debug("Received message %s", s2_msg.to_json()) + + if isinstance(s2_msg, ReceptionStatus): + logger.debug( + "Message is a reception status for %s so registering in cache.", + s2_msg.subject_message_id, + ) + await self._reception_status_awaiter.receive_reception_status(s2_msg) + else: + logger.debug('Message is not a reception status, putting it in the received messages queue.') + await self._received_messages.put(s2_msg) + + def register_handler(self, event_type: Type[S2ConnectionEventsAndMessages], handler: S2EventHandlerAsync) -> None: + """Register a handler for a specific S2 message type. + + :param event_type: The S2 connection event to register the handler for. + :param handler: The handler function (asynchronous or normal) which will handle the message. + """ + self._handlers.register_handler(event_type, handler) + + def unregister_handler(self, s2_message_type: Type[S2ConnectionEventsAndMessages]) -> None: + self._handlers.unregister_handler(s2_message_type) + + async def send_and_forget(self, s2_msg: S2Message) -> None: + json_msg = s2_msg.to_json() + logger.debug("Sending message %s", json_msg) + try: + await self._medium.send(json_msg) + except MediumClosedConnectionError: + logger.error("Unable to send message %s due to %s", s2_msg, str(e)) + raise + + async def respond_with_reception_status( + self, subject_message_id: uuid.UUID, status: ReceptionStatusValues, diagnostic_label: str + ) -> None: + logger.debug( + "Responding to message %s with status %s", subject_message_id, status + ) + await self.send_and_forget( + ReceptionStatus( + subject_message_id=subject_message_id, + status=status, + diagnostic_label=diagnostic_label, + ) + ) + + async def send_msg_and_await_reception_status( + self, + s2_msg: S2MessageWithID, + timeout_reception_status: float = 5.0, + raise_on_error: bool = True, + ) -> ReceptionStatus: + await self.send_and_forget(s2_msg) + logger.debug( + "Waiting for ReceptionStatus for %s %s seconds", + s2_msg.message_id, + timeout_reception_status, + ) + reception_status_task = self._eventloop.create_task(self._reception_status_awaiter.wait_for_reception_status( + s2_msg.message_id, timeout_reception_status + )) + stop_event_task = self._eventloop.create_task(self._wait_till_stop()) + + (done, pending) = await asyncio.wait([reception_status_task, stop_event_task], return_when=asyncio.FIRST_COMPLETED) + + for task in pending: + try: + task.cancel() + await task + except (asyncio.CancelledError, Exception): + pass + + if reception_status_task in done: + try: + reception_status = await reception_status_task + except TimeoutError: + logger.error("Did not receive a reception status on time for %s", s2_msg.message_id) + self._stop_event.set() + raise + else: + #stop_event_task in done + await stop_event_task + raise CouldNotReceiveStatusReceptionError(f"Connection stopped while waiting for ReceptionStatus for message {s2_msg.message_id}") + + if reception_status.status != ReceptionStatusValues.OK and raise_on_error: + raise RuntimeError(f"ReceptionStatus was not OK but rather {reception_status.status}") + + return reception_status diff --git a/src/s2python/connection/async_/control_type/__init__.py b/src/s2python/connection/async_/control_type/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/s2python/connection/async_/control_type/class_based.py b/src/s2python/connection/async_/control_type/class_based.py new file mode 100644 index 0000000..1781e03 --- /dev/null +++ b/src/s2python/connection/async_/control_type/class_based.py @@ -0,0 +1,242 @@ +import abc +import logging +import uuid +from typing import Coroutine, Optional, List, Any, Callable + +from s2python.connection.asset_details import AssetDetails +from s2python.common import ( + Handshake, + EnergyManagementRole, + HandshakeResponse, + SelectControlType, +) +from s2python.connection.async_.connection import S2AsyncConnection +from s2python.connection.types import S2ConnectionEvent, S2ConnectionEventsAndMessages +from s2python.version import S2_VERSION + +from s2python.connection.connection_events import ConnectionStarted, ConnectionStopped + +from s2python.common import ControlType as ProtocolControlType +from s2python.frbc import FRBCInstruction +from s2python.ppbc import PPBCScheduleInstruction +from s2python.ombc import OMBCInstruction +from s2python.message import S2Message + +logger = logging.getLogger("s2python") + + +class S2ControlType(abc.ABC): + @abc.abstractmethod + def get_protocol_control_type(self) -> ProtocolControlType: ... + + @abc.abstractmethod + def register_handlers(self, connection: S2AsyncConnection) -> None: ... + + @abc.abstractmethod + async def activate(self, connection: S2AsyncConnection) -> None: ... + + @abc.abstractmethod + async def deactivate(self, connection: S2AsyncConnection) -> None: ... + + + +class ResourceManagerHandler: + asset_details: AssetDetails + _current_control_type: Optional[S2ControlType] + _control_types: List[S2ControlType] + + def __init__(self, control_types: List[S2ControlType], + asset_details: AssetDetails) -> None: + self.asset_details = asset_details + self._current_control_type = None + self._control_types = control_types + + def get_s2_role(self) -> EnergyManagementRole: + return EnergyManagementRole.RM + + def register_handlers(self, connection: S2AsyncConnection) -> None: + connection.register_handler(ConnectionStarted, self._on_connection_started) + connection.register_handler(Handshake, self._on_handshake) + connection.register_handler(HandshakeResponse, self._on_handshake_response) + connection.register_handler(SelectControlType, self._on_select_control_type) + connection.register_handler(ConnectionStopped, self._on_connection_stop) + + async def _on_connection_started(self, connection: S2AsyncConnection, _: S2ConnectionEvent, __: Optional[Coroutine[Any, Any, None]]) -> None: + await connection.send_msg_and_await_reception_status( + Handshake( + message_id=uuid.uuid4(), + role=self.get_s2_role(), + supported_protocol_versions=[S2_VERSION], + ) + ) + logger.debug( + "Send handshake to CEM. Expecting Handshake and HandshakeResponse from CEM." + ) + + async def _on_handshake( + self, _: S2AsyncConnection, event: S2ConnectionEvent, send_okay: Optional[Coroutine[Any, Any, None]] + ) -> None: + assert send_okay is not None + if not isinstance(event, Handshake): + logger.error( + "Handler for Handshake received a message of the wrong type: %s", + type(event), + ) + return + + logger.debug( + "%s supports S2 protocol versions: %s", + event.role, + event.supported_protocol_versions, + ) + await send_okay + + async def _on_handshake_response( + self, connection: S2AsyncConnection, event: S2ConnectionEvent, send_okay: Optional[Coroutine[Any, Any, None]] + ) -> None: + assert send_okay is not None + if not isinstance(event, HandshakeResponse): + logger.error( + "Handler for HandshakeResponse received a message of the wrong type: %s", + type(event), + ) + return + + logger.debug("Received HandshakeResponse %s", event.to_json()) + logger.debug( + "CEM selected to use version %s", event.selected_protocol_version + ) + await send_okay + logger.debug("Handshake complete. Sending first ResourceManagerDetails.") + + await connection.send_msg_and_await_reception_status( + self.asset_details.to_resource_manager_details(self._control_types) + ) + + async def _on_select_control_type( + self, connection: S2AsyncConnection, event: S2ConnectionEvent, send_okay: Optional[Coroutine[Any, Any, None]] + ) -> None: + assert send_okay is not None + if not isinstance(event, SelectControlType): + logger.error( + "Handler for SelectControlType received a message of the wrong type: %s", + type(event), + ) + return + + await send_okay + + logger.debug( + "CEM selected control type %s. Activating control type.", + event.control_type, + ) + + control_types_by_protocol_name = { + c.get_protocol_control_type(): c for c in self._control_types + } + selected_control_type = control_types_by_protocol_name.get(event.control_type) + + if self._current_control_type is not None: + await self._current_control_type.deactivate(connection) + + self._current_control_type = selected_control_type + + if self._current_control_type is not None: + self._current_control_type.register_handlers(connection) + await self._current_control_type.activate(connection) + + async def _on_connection_stop(self, connection: S2AsyncConnection, __: S2ConnectionEvent, ___: Optional[Coroutine[Any, Any, None]]): + if self._current_control_type: + await self._current_control_type.deactivate(connection) + self._current_control_type = None + + +class FRBCControlType(S2ControlType): + def get_protocol_control_type(self) -> ProtocolControlType: + return ProtocolControlType.FILL_RATE_BASED_CONTROL + + def register_handlers(self, connection: S2AsyncConnection) -> None: + connection.register_handler(FRBCInstruction, self.handle_instruction) + + @abc.abstractmethod + async def handle_instruction( + self, connection: S2AsyncConnection, msg: S2ConnectionEventsAndMessages, send_okay: Optional[Coroutine[Any, Any, None]] + ) -> None: ... + + @abc.abstractmethod + async def activate(self, connection: S2AsyncConnection) -> None: + """Overwrite with the actual dctivation logic of your Resource Manager for this particular control type.""" + + @abc.abstractmethod + async def deactivate(self, connection: S2AsyncConnection) -> None: + """Overwrite with the actual deactivation logic of your Resource Manager for this particular control type.""" + + +class PPBCControlType(S2ControlType): + def get_protocol_control_type(self) -> ProtocolControlType: + return ProtocolControlType.POWER_PROFILE_BASED_CONTROL + + def register_handlers(self, connection: S2AsyncConnection) -> None: + connection.register_handler(PPBCScheduleInstruction, self.handle_instruction) + + @abc.abstractmethod + async def handle_instruction( + self, connection: S2AsyncConnection, msg: S2ConnectionEventsAndMessages, send_okay: Optional[Coroutine[Any, Any, None]] + ) -> None: ... + + @abc.abstractmethod + async def activate(self, connection: S2AsyncConnection) -> None: + """Overwrite with the actual dctivation logic of your Resource Manager for this particular control type.""" + + @abc.abstractmethod + async def deactivate(self, connection: S2AsyncConnection) -> None: + """Overwrite with the actual deactivation logic of your Resource Manager for this particular control type.""" + + +class OMBCControlType(S2ControlType): + def get_protocol_control_type(self) -> ProtocolControlType: + return ProtocolControlType.OPERATION_MODE_BASED_CONTROL + + def register_handlers(self, connection: S2AsyncConnection) -> None: + connection.register_handler(OMBCInstruction, self.handle_instruction) + + @abc.abstractmethod + async def handle_instruction( + self, connection: S2AsyncConnection, msg: S2ConnectionEventsAndMessages, send_okay: Optional[Coroutine[Any, Any, None]] + ) -> None: ... + + @abc.abstractmethod + async def activate(self, connection: S2AsyncConnection) -> None: + """Overwrite with the actual dctivation logic of your Resource Manager for this particular control type.""" + + @abc.abstractmethod + async def deactivate(self, connection: S2AsyncConnection) -> None: + """Overwrite with the actual deactivation logic of your Resource Manager for this particular control type.""" + + +class PEBCControlType(S2ControlType): + def get_protocol_control_type(self) -> ProtocolControlType: + return ProtocolControlType.POWER_ENVELOPE_BASED_CONTROL + + def register_handlers(self, connection: S2AsyncConnection) -> None: + pass + + @abc.abstractmethod + async def activate(self, connection: S2AsyncConnection) -> None: ... + + @abc.abstractmethod + async def deactivate(self, connection: S2AsyncConnection) -> None: ... + + +class NoControlControlType(S2ControlType): + def get_protocol_control_type(self) -> ProtocolControlType: + return ProtocolControlType.NOT_CONTROLABLE + + def register_handlers(self, connection: S2AsyncConnection) -> None: + pass + + @abc.abstractmethod + async def activate(self, connection: S2AsyncConnection) -> None: ... + + @abc.abstractmethod + async def deactivate(self, connection: S2AsyncConnection) -> None: ... diff --git a/src/s2python/connection/async_/medium/__init__.py b/src/s2python/connection/async_/medium/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/s2python/connection/async_/medium/s2_medium.py b/src/s2python/connection/async_/medium/s2_medium.py new file mode 100644 index 0000000..ddf9d59 --- /dev/null +++ b/src/s2python/connection/async_/medium/s2_medium.py @@ -0,0 +1,62 @@ +import abc +import typing +from typing import AsyncGenerator, Awaitable, Callable + +from s2python.s2_parser import UnparsedS2Message + + +class S2MediumException(Exception): + ... + +class MediumClosedConnectionError(S2MediumException): + ... + +class MediumCouldNotConnectError(S2MediumException): + ... + + +class S2MediumConnection(abc.ABC): + @abc.abstractmethod + async def is_connected(self) -> bool: + ... + + @abc.abstractmethod + async def messages(self) -> AsyncGenerator[UnparsedS2Message, None]: + ... + + @abc.abstractmethod + async def send(self, message: str) -> None: + ... + + +# BuildS2ConnectionAsync = Callable[[S2MediumConnectionAsync], Awaitable["S2AsyncConnection"]] +# +# +# class S2MediumConnectorAsync(abc.ABC): +# """S2 medium specific factory for S2Connections.""" +# +# @abc.abstractmethod +# async def set_connection_builder(self, +# builder: BuildS2ConnectionAsync) -> None: +# ... +# +# @abc.abstractmethod +# async def run(self) -> None: +# """Start up the connection or start listening for new connections. +# +# This function may block or not depending in the implementation. +# E.g. it will block if a listening socket is opened, or it may return once a single client +# connection is established. +# """ +# ... +# +# @abc.abstractmethod +# async def close(self) -> None: +# """Close the medium connector. +# +# This does not close any functions created by the connector, only the connector itself. +# Also, this function may not be implemented in all cases. For instance, if the connector +# only creates a single client connection and then exits, there is no need to close anything +# so in those cases this function may be a no-op. +# """ +# ... diff --git a/src/s2python/connection/async_/medium/websocket.py b/src/s2python/connection/async_/medium/websocket.py new file mode 100644 index 0000000..2530cd8 --- /dev/null +++ b/src/s2python/connection/async_/medium/websocket.py @@ -0,0 +1,80 @@ +import logging +import ssl +from typing import AsyncGenerator, Optional, Dict, Any +from typing_extensions import override + +from s2python.s2_parser import UnparsedS2Message + +try: + import websockets + from websockets.asyncio.client import ( + ClientConnection as WSConnection, + connect as ws_connect, + ) +except ImportError as exc: + raise ImportError( + "The 'websockets' package is required. Run 'pip install s2-python[ws]' to use this feature." + ) from exc + +from s2python.connection.async_.medium.s2_medium import MediumClosedConnectionError, MediumCouldNotConnectError, S2MediumConnection + +logger = logging.getLogger("s2python") + + +class WebsocketClientMedium(S2MediumConnection): + url: str + + _ws: Optional[WSConnection] + _verify_certificate: bool + _bearer_token: Optional[str] + _closed: bool + + def __init__(self, url: str, verify_certificate: bool = True, bearer_token: Optional[str] = None) -> None: + self.url = url + + self._ws = None + self._verify_certificate = verify_certificate + self._bearer_token = bearer_token + self._closed = False + + async def connect(self) -> None: + try: + # set up connection arguments for SSL and bearer token, if required + connection_kwargs: Dict[str, Any] = {} + if self.url.startswith("wss://") and not self._verify_certificate: + connection_kwargs["ssl"] = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + connection_kwargs["ssl"].check_hostname = False + connection_kwargs["ssl"].verify_mode = ssl.CERT_NONE + + if self._bearer_token: + connection_kwargs["additional_headers"] = { + "Authorization": f"Bearer {self._bearer_token}" + } + + self.ws = await ws_connect(uri=self.url, **connection_kwargs) + except (EOFError, OSError, websockets.WebSocketException) as e: + self._closed = True + message = f"Could not connect due to: {e}" + logger.error(message) + raise MediumCouldNotConnectError(message) + + @override + async def is_connected(self) -> bool: + return self.ws is not None and not self._closed + + @override + async def messages(self) -> AsyncGenerator[UnparsedS2Message, None]: + try: + async for message in self.ws: + yield message + except websockets.WebSocketException as e: + self._closed = True + raise MediumClosedConnectionError(f'Could not receive more messages on websocket connection {self.url}') from e + + @override + async def send(self, message: str) -> None: + try: + await self.ws.send(message) + except websockets.WebSocketException as e: + self._closed = True + raise MediumClosedConnectionError(f'Could not send message {message}') from e diff --git a/src/s2python/connection/async_/message_handlers.py b/src/s2python/connection/async_/message_handlers.py new file mode 100644 index 0000000..ee671a3 --- /dev/null +++ b/src/s2python/connection/async_/message_handlers.py @@ -0,0 +1,107 @@ +import asyncio +import logging +import uuid +from typing import Any, Coroutine, Optional, Type, Dict, Callable, TYPE_CHECKING + +if TYPE_CHECKING: + from s2python.connection.async_.connection import S2AsyncConnection + +from s2python.common import ReceptionStatusValues +from s2python.connection.types import S2ConnectionEvent, S2ConnectionEventsAndMessages +from s2python.message import S2Message, S2MessageWithID + + +logger = logging.getLogger("s2python") + +S2EventHandlerAsync = Callable[["S2AsyncConnection", S2ConnectionEvent, Optional[Coroutine[Any, Any, None]]], Coroutine[Any, Any, None]] + +class SendOkay: + _status_is_send: asyncio.Event + _connection: "S2AsyncConnection" + _subject_message_id: uuid.UUID + + def __init__(self, connection: "S2AsyncConnection", subject_message_id: uuid.UUID): + self._status_is_send = asyncio.Event() + self._connection = connection + self._subject_message_id = subject_message_id + + async def run(self) -> None: + """Send the ReceptionStatus OK asynchronously and register it.""" + self._status_is_send.set() + + await self._connection.respond_with_reception_status( # pylint: disable=protected-access + subject_message_id=self._subject_message_id, + status=ReceptionStatusValues.OK, + diagnostic_label="Processed okay.", + ) + + async def ensure_send(self, type_msg: Type[S2Message]) -> None: + """Ensure that the ReceptionStatus has been send. + + Send the ReceptionStatus OK if it hasn't been send yet. + + :param type_msg: The type of S2 message for which the ReceptionStatus should have been send. Logging purposes. + """ + if not self._status_is_send.is_set(): + logger.warning( + "Handler for message %s %s did not call send_okay / function to send the ReceptionStatus. " + "Sending it now.", + type_msg, + self._subject_message_id, + ) + await self.run() + + +class MessageHandlers: + handlers: Dict[Type[S2ConnectionEventsAndMessages], S2EventHandlerAsync] + + def __init__(self) -> None: + self.handlers = {} + + async def handle_event(self, connection: "S2AsyncConnection", event: S2ConnectionEventsAndMessages) -> None: + """Handle the S2 message using the registered handler. + + :param connection: The S2 conncetion the `msg` is received from. + :param msg: The S2 message + """ + handler = self.handlers.get(type(event)) + if handler is not None: + send_okay = None + try: + if hasattr(event, "message_id"): + logger.debug('Handling S2 message with message id %s using handler %s', event.message_id, handler) + send_okay = SendOkay(connection, event.message_id) + await handler(connection, event, send_okay.run()) + else: + logger.debug('Handling S2 connection event (without message id) using handler %s', handler) + await handler(connection, event, None) + except Exception: + if send_okay and not send_okay._status_is_send.is_set(): + await connection.respond_with_reception_status( + subject_message_id=event.message_id, + status=ReceptionStatusValues.PERMANENT_ERROR, + diagnostic_label=f"While processing message {event.message_id} " + f"an unrecoverable error occurred.", + ) + raise + if send_okay: + await send_okay.ensure_send(type(event)) + else: + logger.warning( + "Received an event of type %s but no handler is registered. Ignoring the event.", + type(event), + ) + + def register_handler( + self, event_type: Type[S2ConnectionEvent], handler: S2EventHandlerAsync + ) -> None: + """Register a coroutine function or a normal function as the handler for a specific S2 message type. + + :param msg_type: The S2 message type to attach the handler to. + :param handler: The function (asynchronuous or normal) which should handle the S2 message. + """ + self.handlers[event_type] = handler + + def unregister_handler(self, s2_message_type: Type[S2ConnectionEvent]): + if s2_message_type in self.handlers: + del self.handlers[s2_message_type] diff --git a/src/s2python/connection/connection_events.py b/src/s2python/connection/connection_events.py new file mode 100644 index 0000000..9f65d40 --- /dev/null +++ b/src/s2python/connection/connection_events.py @@ -0,0 +1,12 @@ +import abc + + +class S2ConnectionEvent(abc.ABC): + pass + + +class ConnectionStarted(S2ConnectionEvent): + pass + +class ConnectionStopped(S2ConnectionEvent): + pass diff --git a/src/s2python/connection/control_type.py b/src/s2python/connection/control_type.py new file mode 100644 index 0000000..09f6252 --- /dev/null +++ b/src/s2python/connection/control_type.py @@ -0,0 +1,11 @@ +import abc + +from s2python.common import EnergyManagementRole +from s2python.connection.async_.connection import S2AsyncConnection + +class RoleHandler(abc.ABC): + @abc.abstractmethod + def get_s2_role(self) -> EnergyManagementRole: ... + + @abc.abstractmethod + def register_handlers(self, connection: S2AsyncConnection) -> None: ... diff --git a/src/s2python/connection/sync/__init__.py b/src/s2python/connection/sync/__init__.py new file mode 100644 index 0000000..5d24e91 --- /dev/null +++ b/src/s2python/connection/sync/__init__.py @@ -0,0 +1,2 @@ +from s2python.connection.sync.connection import S2SyncConnection +from s2python.connection.sync.connection import S2EventHandlerSync \ No newline at end of file diff --git a/src/s2python/connection/sync/connection.py b/src/s2python/connection/sync/connection.py new file mode 100644 index 0000000..30953fb --- /dev/null +++ b/src/s2python/connection/sync/connection.py @@ -0,0 +1,120 @@ +import asyncio +import logging +import threading +import uuid +from typing import Any, Coroutine, Optional, Type, Callable + +from s2python.common import ( + ReceptionStatusValues, +) +from s2python.connection.types import S2ConnectionEvent, S2ConnectionEventsAndMessages +from s2python.message import S2Message + +from s2python.common import ReceptionStatus +from s2python.connection.async_.medium.s2_medium import S2MediumConnection +from s2python.connection.async_ import S2AsyncConnection +from s2python.message import S2MessageWithID + +logger = logging.getLogger("s2python") + +S2EventHandlerSync = Callable[["S2SyncConnection", S2ConnectionEvent, Optional[Callable[[], None]]], None] + + +class S2SyncConnection: + _thread: threading.Thread + _eventloop: asyncio.AbstractEventLoop + _async_s2_connection: S2AsyncConnection + + def __init__( # pylint: disable=too-many-arguments + self, + medium: S2MediumConnection, + eventloop: Optional[asyncio.AbstractEventLoop] = None, + ) -> None: + self._thread = threading.Thread(target=self._run_eventloop) + self._eventloop = asyncio.new_event_loop() if eventloop is None else eventloop + self._async_s2_connection = S2AsyncConnection(medium, self._eventloop) + + def start(self) -> None: + self._thread.start() + asyncio.run_coroutine_threadsafe( + self._async_s2_connection.start(), + self._eventloop, + ).result() + + def _run_eventloop(self) -> None: + logger.debug("Starting synchronous S2 connection event loop in thread %s", self._thread.name) + self._eventloop.run_forever() + logger.debug("Synchronous S2 connection event loop in thread %s has stopped", self._thread.name) + + def stop(self) -> None: + """Stops the S2 connection. + + Note: Ensure this method is called from a different thread than the thread running the S2 connection. + Otherwise it will block waiting on the coroutine _do_stop to terminate successfully but it can't run + the coroutine. A `RuntimeError` will be raised to prevent the indefinite block. + """ + if threading.current_thread() == self._thread: + raise RuntimeError( + "Do not call stop from the thread running the S2 connection. This results in an infinite block!" + ) + if self._eventloop.is_running(): + asyncio.run_coroutine_threadsafe(self._async_s2_connection.stop(), self._eventloop).result() + self._eventloop.stop() + self._thread.join() + logger.info("Stopped the S2 connection.") + + def register_handler(self, s2_message_type: Type[S2ConnectionEventsAndMessages], handler: S2EventHandlerSync) -> None: + """Register a handler for a specific S2 message type. + + :param s2_message_type: The S2 message type to register the handler for. + :param handler: The handler function (asynchronous or normal) which will handle the message. + """ + + async def handle_s2_message_async_wrapper( + _: S2AsyncConnection, + s2_msg: S2ConnectionEvent, + send_okay: Optional[Coroutine[Any, Any, None]], + ) -> None: + await self._eventloop.run_in_executor( + None, + handler, + self, + s2_msg, + lambda: asyncio.run_coroutine_threadsafe(send_okay, self._eventloop).result() if send_okay else None, + ) + + self._async_s2_connection.register_handler(s2_message_type, handle_s2_message_async_wrapper) + + def unregister_handler(self, s2_message_type: Type[S2MessageWithID]) -> None: + self._async_s2_connection.unregister_handler(s2_message_type) + + def send_and_forget( + self, s2_msg: S2Message + ) -> None: + asyncio.run_coroutine_threadsafe( + self._async_s2_connection.send_and_forget(s2_msg), + self._eventloop, + ).result() + + def respond_with_reception_status( + self, subject_message_id: uuid.UUID, status: ReceptionStatusValues, diagnostic_label: str + ) -> None: + asyncio.run_coroutine_threadsafe( + self._async_s2_connection.respond_with_reception_status( + subject_message_id, status, diagnostic_label + ), + self._eventloop, + ).result() + + def send_msg_and_await_reception_status( + self, + s2_msg: S2MessageWithID, + timeout_reception_status: float = 5.0, + raise_on_error: bool = True, + ) -> ReceptionStatus: + return asyncio.run_coroutine_threadsafe( + self._async_s2_connection.send_msg_and_await_reception_status( + s2_msg, timeout_reception_status, raise_on_error + ), + self._eventloop, + ).result() diff --git a/src/s2python/connection/sync/control_type/class_based.py b/src/s2python/connection/sync/control_type/class_based.py new file mode 100644 index 0000000..8f71be5 --- /dev/null +++ b/src/s2python/connection/sync/control_type/class_based.py @@ -0,0 +1,241 @@ +import abc +import logging +import uuid +from typing import Optional, List, Callable + +from s2python.connection.asset_details import AssetDetails +from s2python.common import ( + Handshake, + EnergyManagementRole, + HandshakeResponse, + SelectControlType, +) +from s2python.connection.sync.connection import S2SyncConnection +from s2python.connection.types import S2ConnectionEvent, S2ConnectionEventsAndMessages +from s2python.version import S2_VERSION + +from s2python.connection.connection_events import ConnectionStarted, ConnectionStopped + +from s2python.common import ControlType as ProtocolControlType +from s2python.frbc import FRBCInstruction +from s2python.ppbc import PPBCScheduleInstruction +from s2python.ombc import OMBCInstruction + +logger = logging.getLogger("s2python") + + +class S2ControlType(abc.ABC): + @abc.abstractmethod + def get_protocol_control_type(self) -> ProtocolControlType: ... + + @abc.abstractmethod + def register_handlers(self, connection: S2SyncConnection) -> None: ... + + @abc.abstractmethod + def activate(self, connection: S2SyncConnection) -> None: ... + + @abc.abstractmethod + def deactivate(self, connection: S2SyncConnection) -> None: ... + + + +class ResourceManagerHandler: + asset_details: AssetDetails + _current_control_type: Optional[S2ControlType] + _control_types: List[S2ControlType] + + def __init__(self, control_types: List[S2ControlType], + asset_details: AssetDetails) -> None: + self.asset_details = asset_details + self._current_control_type = None + self._control_types = control_types + + def get_s2_role(self) -> EnergyManagementRole: + return EnergyManagementRole.RM + + def register_handlers(self, connection: S2SyncConnection) -> None: + connection.register_handler(ConnectionStarted, self._on_connection_started) + connection.register_handler(Handshake, self._on_handshake) + connection.register_handler(HandshakeResponse, self._on_handshake_response) + connection.register_handler(SelectControlType, self._on_select_control_type) + connection.register_handler(ConnectionStopped, self._on_connection_stop) + + def _on_connection_started(self, connection: S2SyncConnection, _: S2ConnectionEvent, __: Optional[Callable[[], None]]) -> None: + connection.send_msg_and_await_reception_status( + Handshake( + message_id=uuid.uuid4(), + role=self.get_s2_role(), + supported_protocol_versions=[S2_VERSION], + ) + ) + logger.debug( + "Send handshake to CEM. Expecting Handshake and HandshakeResponse from CEM." + ) + + def _on_handshake( + self, _: S2SyncConnection, event: S2ConnectionEvent, send_okay: Optional[Callable[[], None]] + ) -> None: + assert send_okay is not None + if not isinstance(event, Handshake): + logger.error( + "Handler for Handshake received a message of the wrong type: %s", + type(event), + ) + return + + logger.debug( + "%s supports S2 protocol versions: %s", + event.role, + event.supported_protocol_versions, + ) + send_okay() + + def _on_handshake_response( + self, connection: S2SyncConnection, event: S2ConnectionEvent, send_okay: Optional[Callable[[], None]] + ) -> None: + assert send_okay is not None + if not isinstance(event, HandshakeResponse): + logger.error( + "Handler for HandshakeResponse received a message of the wrong type: %s", + type(event), + ) + return + + logger.debug("Received HandshakeResponse %s", event.to_json()) + logger.debug( + "CEM selected to use version %s", event.selected_protocol_version + ) + send_okay() + logger.debug("Handshake complete. Sending first ResourceManagerDetails.") + + connection.send_msg_and_await_reception_status( + self.asset_details.to_resource_manager_details(self._control_types) + ) + + def _on_select_control_type( + self, connection: S2SyncConnection, event: S2ConnectionEvent, send_okay: Optional[Callable[[], None]] + ) -> None: + assert send_okay is not None + if not isinstance(event, SelectControlType): + logger.error( + "Handler for SelectControlType received a message of the wrong type: %s", + type(event), + ) + return + + send_okay + + logger.debug( + "CEM selected control type %s. Activating control type.", + event.control_type, + ) + + control_types_by_protocol_name = { + c.get_protocol_control_type(): c for c in self._control_types + } + selected_control_type = control_types_by_protocol_name.get(event.control_type) + + if self._current_control_type is not None: + self._current_control_type.deactivate(connection) + + self._current_control_type = selected_control_type + + if self._current_control_type is not None: + self._current_control_type.register_handlers(connection) + self._current_control_type.activate(connection) + + def _on_connection_stop(self, connection: S2SyncConnection, __: S2ConnectionEvent, ___: Optional[Callable[[], None]]): + if self._current_control_type: + self._current_control_type.deactivate(connection) + self._current_control_type = None + + +class FRBCControlType(S2ControlType): + def get_protocol_control_type(self) -> ProtocolControlType: + return ProtocolControlType.FILL_RATE_BASED_CONTROL + + def register_handlers(self, connection: S2SyncConnection) -> None: + connection.register_handler(FRBCInstruction, self.handle_instruction) + + @abc.abstractmethod + def handle_instruction( + self, connection: S2SyncConnection, msg: S2ConnectionEventsAndMessages, send_okay: Optional[Callable[[], None]] + ) -> None: ... + + @abc.abstractmethod + def activate(self, connection: S2SyncConnection) -> None: + """Overwrite with the actual dctivation logic of your Resource Manager for this particular control type.""" + + @abc.abstractmethod + def deactivate(self, connection: S2SyncConnection) -> None: + """Overwrite with the actual deactivation logic of your Resource Manager for this particular control type.""" + + +class PPBCControlType(S2ControlType): + def get_protocol_control_type(self) -> ProtocolControlType: + return ProtocolControlType.POWER_PROFILE_BASED_CONTROL + + def register_handlers(self, connection: S2SyncConnection) -> None: + connection.register_handler(PPBCScheduleInstruction, self.handle_instruction) + + @abc.abstractmethod + def handle_instruction( + self, connection: S2SyncConnection, msg: S2ConnectionEventsAndMessages, send_okay: Optional[Callable[[], None]] + ) -> None: ... + + @abc.abstractmethod + def activate(self, connection: S2SyncConnection) -> None: + """Overwrite with the actual dctivation logic of your Resource Manager for this particular control type.""" + + @abc.abstractmethod + def deactivate(self, connection: S2SyncConnection) -> None: + """Overwrite with the actual deactivation logic of your Resource Manager for this particular control type.""" + + +class OMBCControlType(S2ControlType): + def get_protocol_control_type(self) -> ProtocolControlType: + return ProtocolControlType.OPERATION_MODE_BASED_CONTROL + + def register_handlers(self, connection: S2SyncConnection) -> None: + connection.register_handler(OMBCInstruction, self.handle_instruction) + + @abc.abstractmethod + def handle_instruction( + self, connection: S2SyncConnection, msg: S2ConnectionEventsAndMessages, send_okay: Optional[Callable[[], None]] + ) -> None: ... + + @abc.abstractmethod + def activate(self, connection: S2SyncConnection) -> None: + """Overwrite with the actual dctivation logic of your Resource Manager for this particular control type.""" + + @abc.abstractmethod + def deactivate(self, connection: S2SyncConnection) -> None: + """Overwrite with the actual deactivation logic of your Resource Manager for this particular control type.""" + + +class PEBCControlType(S2ControlType): + def get_protocol_control_type(self) -> ProtocolControlType: + return ProtocolControlType.POWER_ENVELOPE_BASED_CONTROL + + def register_handlers(self, connection: S2SyncConnection) -> None: + pass + + @abc.abstractmethod + def activate(self, connection: S2SyncConnection) -> None: ... + + @abc.abstractmethod + def deactivate(self, connection: S2SyncConnection) -> None: ... + + +class NoControlControlType(S2ControlType): + def get_protocol_control_type(self) -> ProtocolControlType: + return ProtocolControlType.NOT_CONTROLABLE + + def register_handlers(self, connection: S2SyncConnection) -> None: + pass + + @abc.abstractmethod + def activate(self, connection: S2SyncConnection) -> None: ... + + @abc.abstractmethod + def deactivate(self, connection: S2SyncConnection) -> None: ... diff --git a/src/s2python/connection/types.py b/src/s2python/connection/types.py new file mode 100644 index 0000000..abbf59e --- /dev/null +++ b/src/s2python/connection/types.py @@ -0,0 +1,8 @@ +from s2python.connection.connection_events import S2ConnectionEvent + +from typing import Callable, Union, Coroutine, Any, Optional + +from s2python.message import S2MessageWithID + + +S2ConnectionEventsAndMessages = Union[S2MessageWithID, S2ConnectionEvent] diff --git a/src/s2python/s2_connection.py b/src/s2python/s2_connection.py index b5d1ab0..896368e 100644 --- a/src/s2python/s2_connection.py +++ b/src/s2python/s2_connection.py @@ -32,7 +32,6 @@ Currency, SelectControlType, ) -from s2python.generated.gen_s2 import CommodityQuantity from s2python.reception_status_awaiter import ReceptionStatusAwaiter from s2python.s2_control_type import S2ControlType from s2python.s2_parser import S2Parser @@ -43,44 +42,7 @@ logger = logging.getLogger("s2python") -@dataclass -class AssetDetails: # pylint: disable=too-many-instance-attributes - resource_id: uuid.UUID - - provides_forecast: bool - provides_power_measurements: List[CommodityQuantity] - - instruction_processing_delay: Duration - roles: List[Role] - currency: Optional[Currency] = None - - name: Optional[str] = None - manufacturer: Optional[str] = None - model: Optional[str] = None - firmware_version: Optional[str] = None - serial_number: Optional[str] = None - - def to_resource_manager_details( - self, control_types: List[S2ControlType] - ) -> ResourceManagerDetails: - return ResourceManagerDetails( - available_control_types=[ - control_type.get_protocol_control_type() - for control_type in control_types - ], - currency=self.currency, - firmware_version=self.firmware_version, - instruction_processing_delay=self.instruction_processing_delay, - manufacturer=self.manufacturer, - message_id=uuid.uuid4(), - model=self.model, - name=self.name, - provides_forecast=self.provides_forecast, - provides_power_measurement_types=self.provides_power_measurements, - resource_id=self.resource_id, - roles=self.roles, - serial_number=self.serial_number, - ) + S2MessageHandler = Union[ diff --git a/src/s2python/s2_control_type.py b/src/s2python/s2_control_type.py index 135f775..02e51bc 100644 --- a/src/s2python/s2_control_type.py +++ b/src/s2python/s2_control_type.py @@ -11,106 +11,3 @@ from s2python.s2_connection import S2Connection, MessageHandlers -class S2ControlType(abc.ABC): - @abc.abstractmethod - def get_protocol_control_type(self) -> ProtocolControlType: ... - - @abc.abstractmethod - def register_handlers(self, handlers: "MessageHandlers") -> None: ... - - @abc.abstractmethod - def activate(self, conn: "S2Connection") -> None: ... - - @abc.abstractmethod - def deactivate(self, conn: "S2Connection") -> None: ... - - -class FRBCControlType(S2ControlType): - def get_protocol_control_type(self) -> ProtocolControlType: - return ProtocolControlType.FILL_RATE_BASED_CONTROL - - def register_handlers(self, handlers: "MessageHandlers") -> None: - handlers.register_handler(FRBCInstruction, self.handle_instruction) - - @abc.abstractmethod - def handle_instruction( - self, conn: "S2Connection", msg: S2Message, send_okay: typing.Callable[[], None] - ) -> None: ... - - @abc.abstractmethod - def activate(self, conn: "S2Connection") -> None: - """Overwrite with the actual dctivation logic of your Resource Manager for this particular control type.""" - - @abc.abstractmethod - def deactivate(self, conn: "S2Connection") -> None: - """Overwrite with the actual deactivation logic of your Resource Manager for this particular control type.""" - - -class PPBCControlType(S2ControlType): - def get_protocol_control_type(self) -> ProtocolControlType: - return ProtocolControlType.POWER_PROFILE_BASED_CONTROL - - def register_handlers(self, handlers: "MessageHandlers") -> None: - handlers.register_handler(PPBCScheduleInstruction, self.handle_instruction) - - @abc.abstractmethod - def handle_instruction( - self, conn: "S2Connection", msg: S2Message, send_okay: typing.Callable[[], None] - ) -> None: ... - - @abc.abstractmethod - def activate(self, conn: "S2Connection") -> None: - """Overwrite with the actual dctivation logic of your Resource Manager for this particular control type.""" - - @abc.abstractmethod - def deactivate(self, conn: "S2Connection") -> None: - """Overwrite with the actual deactivation logic of your Resource Manager for this particular control type.""" - - -class OMBCControlType(S2ControlType): - def get_protocol_control_type(self) -> ProtocolControlType: - return ProtocolControlType.OPERATION_MODE_BASED_CONTROL - - def register_handlers(self, handlers: "MessageHandlers") -> None: - handlers.register_handler(OMBCInstruction, self.handle_instruction) - - @abc.abstractmethod - def handle_instruction( - self, conn: "S2Connection", msg: S2Message, send_okay: typing.Callable[[], None] - ) -> None: ... - - @abc.abstractmethod - def activate(self, conn: "S2Connection") -> None: - """Overwrite with the actual dctivation logic of your Resource Manager for this particular control type.""" - - @abc.abstractmethod - def deactivate(self, conn: "S2Connection") -> None: - """Overwrite with the actual deactivation logic of your Resource Manager for this particular control type.""" - - -class PEBCControlType(S2ControlType): - def get_protocol_control_type(self) -> ProtocolControlType: - return ProtocolControlType.POWER_ENVELOPE_BASED_CONTROL - - def register_handlers(self, handlers: "MessageHandlers") -> None: - pass - - @abc.abstractmethod - def activate(self, conn: "S2Connection") -> None: ... - - @abc.abstractmethod - def deactivate(self, conn: "S2Connection") -> None: ... - - -class NoControlControlType(S2ControlType): - def get_protocol_control_type(self) -> ProtocolControlType: - return ProtocolControlType.NOT_CONTROLABLE - - def register_handlers(self, handlers: "MessageHandlers") -> None: - pass - - @abc.abstractmethod - def activate(self, conn: "S2Connection") -> None: ... - - @abc.abstractmethod - def deactivate(self, conn: "S2Connection") -> None: ... diff --git a/src/s2python/s2_parser.py b/src/s2python/s2_parser.py index ca17306..0cddf18 100644 --- a/src/s2python/s2_parser.py +++ b/src/s2python/s2_parser.py @@ -38,6 +38,9 @@ M = TypeVar("M", bound=S2MessageComponent) +UnparsedS2Message = Union[dict[Any, Any], str, bytes] + + # May be generated with development_utilities/generate_s2_message_type_to_class.py TYPE_TO_MESSAGE_CLASS: Dict[str, Type[S2Message]] = { "FRBC.ActuatorStatus": FRBCActuatorStatus, @@ -67,13 +70,13 @@ class S2Parser: @staticmethod - def _parse_json_if_required(unparsed_message: Union[dict[Any, Any], str, bytes]) -> dict: + def _parse_json_if_required(unparsed_message: UnparsedS2Message) -> dict: if isinstance(unparsed_message, (str, bytes)): return json.loads(unparsed_message) return unparsed_message @staticmethod - def parse_as_any_message(unparsed_message: Union[dict[Any, Any], str, bytes]) -> S2Message: + def parse_as_any_message(unparsed_message: UnparsedS2Message) -> S2Message: """Parse the message as any S2 python message regardless of message type. :param unparsed_message: The message as a JSON-formatted string or as a json-parsed dictionary. @@ -94,7 +97,7 @@ def parse_as_any_message(unparsed_message: Union[dict[Any, Any], str, bytes]) -> @staticmethod def parse_as_message( - unparsed_message: Union[dict[Any, Any], str, bytes], as_message: Type[M] + unparsed_message: UnparsedS2Message, as_message: Type[M] ) -> M: """Parse the message to a specific S2 python message. @@ -108,7 +111,7 @@ def parse_as_message( @staticmethod def parse_message_type( - unparsed_message: Union[dict[Any, Any], str, bytes], + unparsed_message: UnparsedS2Message, ) -> Optional[S2MessageType]: """Parse only the message type from the unparsed message.