From de59eacf61b19b76ae4622f8486a66003c2a6af1 Mon Sep 17 00:00:00 2001 From: Rene Reimann Date: Wed, 4 Feb 2026 15:42:29 +0100 Subject: [PATCH 1/6] adding a custom project8 sensor logger, currently we just start with a copy of the normal PostgresSensorLogger with commented out insert. We will taylor it to the project8 database --- dripline/extensions/__init__.py | 1 + dripline/extensions/project8_sensor_logger.py | 61 +++++++++++++++++++ 2 files changed, 62 insertions(+) create mode 100644 dripline/extensions/project8_sensor_logger.py diff --git a/dripline/extensions/__init__.py b/dripline/extensions/__init__.py index c4c5843..616199d 100644 --- a/dripline/extensions/__init__.py +++ b/dripline/extensions/__init__.py @@ -13,3 +13,4 @@ from .thermo_fisher_endpoint import * from .ethernet_thermo_fisher_service import * from .pfeiffer_endpoint import * +from .project8_sensor_logger import * diff --git a/dripline/extensions/project8_sensor_logger.py b/dripline/extensions/project8_sensor_logger.py new file mode 100644 index 0000000..e89dda2 --- /dev/null +++ b/dripline/extensions/project8_sensor_logger.py @@ -0,0 +1,61 @@ +''' +A Postgres Interface-based logger +''' + +from __future__ import absolute_import + +# standard libs +import logging + +# 3rd party libs +import sqlalchemy + +# internal imports +from dripline.core import AlertConsumer +from dripline.implementations.postgres_interface import PostgreSQLInterface + +__all__ = [] +logger = logging.getLogger(__name__) + + +__all__.append('Project8SensorLogger') +class Project8SensorLogger(AlertConsumer, PostgreSQLInterface): + ''' + A custom sensor logger tailored to the project 8 database structure. + An alert consumer which converts alert messages into database insertions. + + The assumption made is that complex logic dealing with organization or structure of the particular database should live in the database itself (possibly in a view, with a trigger, ...) and that here we can simply do an insert. + ''' + def __init__(self, insertion_table_endpoint_name, **kwargs): + ''' + ''' + AlertConsumer.__init__(self, add_endpoints_now=False, **kwargs) + PostgreSQLInterface.__init__(self, **kwargs) + + self.insertion_table_endpoint_name = insertion_table_endpoint_name + + self.connect_to_db(self.auth) + + self.add_endpoints_from_config() + + # add_endpoint is a mess here because of the diamond inheritance, so let's be explicit + def add_child(self, endpoint): + AlertConsumer.add_child(self, endpoint) + self.add_child_table(endpoint) + + def process_payload(self, a_payload, a_routing_key_data, a_message_timestamp): + try: + this_data_table = self.sync_children[self.insertion_table_endpoint_name] + # combine data sources + insert_data = {'timestamp': a_message_timestamp} + insert_data.update(a_routing_key_data) + insert_data.update(a_payload.to_python()) + logger.info(f"Inserting from endpoint {self.insertion_table_endpoint_name}; data are:\n{insert_data}") + # do the insert + #insert_return = this_data_table.do_insert(**insert_data) + #logger.debug(f"Return from insertion: {insert_return}") + logger.info("finished processing data") + except sqlalchemy.exc.SQLAlchemyError as err: + logger.critical(f'Received SQL error while doing insert: {err}') + except Exception as err: + logger.critical(f'An exception was raised while processing a payload to insert: {err}') From 2315f1a70c6b671ddc736b51a90727207b0cd86e Mon Sep 17 00:00:00 2001 From: Rene Reimann Date: Wed, 4 Feb 2026 22:29:30 +0100 Subject: [PATCH 2/6] this implementation now looks up the table type in the id_map table. --- dripline/extensions/project8_sensor_logger.py | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/dripline/extensions/project8_sensor_logger.py b/dripline/extensions/project8_sensor_logger.py index e89dda2..6fc671d 100644 --- a/dripline/extensions/project8_sensor_logger.py +++ b/dripline/extensions/project8_sensor_logger.py @@ -6,6 +6,7 @@ # standard libs import logging +import re # 3rd party libs import sqlalchemy @@ -22,17 +23,17 @@ class Project8SensorLogger(AlertConsumer, PostgreSQLInterface): ''' A custom sensor logger tailored to the project 8 database structure. - An alert consumer which converts alert messages into database insertions. - - The assumption made is that complex logic dealing with organization or structure of the particular database should live in the database itself (possibly in a view, with a trigger, ...) and that here we can simply do an insert. ''' - def __init__(self, insertion_table_endpoint_name, **kwargs): + def __init__(self, sensor_type_map_table, data_tables_dict={}, **kwargs): ''' + sensor_type_map_table (str): name of the child endpoint of this instance which provides access to the endpoint_id_map, which stores the sensor type + data_tables_dict (dict): dictionary mapping types (in the sensor_type_map_table) to child endpoints of this instance which provide access to the data_table for that type ''' AlertConsumer.__init__(self, add_endpoints_now=False, **kwargs) PostgreSQLInterface.__init__(self, **kwargs) - - self.insertion_table_endpoint_name = insertion_table_endpoint_name + + self._sensor_type_map_table = sensor_type_map_table + self._data_tables = data_tables_dict self.connect_to_db(self.auth) @@ -45,12 +46,21 @@ def add_child(self, endpoint): def process_payload(self, a_payload, a_routing_key_data, a_message_timestamp): try: - this_data_table = self.sync_children[self.insertion_table_endpoint_name] + # get the type and table for the sensor + this_type = None + this_type = self.sync_children[self._sensor_type_map_table].do_select(return_cols=["type"], + where_eq_dict=a_routing_key_data) + #if len(this_type) != 1: + # raise Exception("not registered endpoint") + table_name = self._data_tables[this_type[1][0][0]] + this_data_table = self.sync_children[table_name] + # combine data sources insert_data = {'timestamp': a_message_timestamp} insert_data.update(a_routing_key_data) insert_data.update(a_payload.to_python()) - logger.info(f"Inserting from endpoint {self.insertion_table_endpoint_name}; data are:\n{insert_data}") + logger.info(f"Inserting {a_routing_key_data} in table {table_type}; data are:\n{insert_data}") + # do the insert #insert_return = this_data_table.do_insert(**insert_data) #logger.debug(f"Return from insertion: {insert_return}") From e54c3d5af47dd099215e0427f12f397e809a5787 Mon Sep 17 00:00:00 2001 From: Rene Reimann Date: Wed, 4 Feb 2026 22:34:21 +0100 Subject: [PATCH 3/6] adding todo and adding an example config file --- dripline/extensions/project8_sensor_logger.py | 2 + examples/sensor-logger_mainz.yaml | 49 +++++++++++++++++++ 2 files changed, 51 insertions(+) create mode 100755 examples/sensor-logger_mainz.yaml diff --git a/dripline/extensions/project8_sensor_logger.py b/dripline/extensions/project8_sensor_logger.py index 6fc671d..204e4ab 100644 --- a/dripline/extensions/project8_sensor_logger.py +++ b/dripline/extensions/project8_sensor_logger.py @@ -50,6 +50,7 @@ def process_payload(self, a_payload, a_routing_key_data, a_message_timestamp): this_type = None this_type = self.sync_children[self._sensor_type_map_table].do_select(return_cols=["type"], where_eq_dict=a_routing_key_data) + # add safty check, and see if the key is contained in the table #if len(this_type) != 1: # raise Exception("not registered endpoint") table_name = self._data_tables[this_type[1][0][0]] @@ -62,6 +63,7 @@ def process_payload(self, a_payload, a_routing_key_data, a_message_timestamp): logger.info(f"Inserting {a_routing_key_data} in table {table_type}; data are:\n{insert_data}") # do the insert + ### dry run for testing #insert_return = this_data_table.do_insert(**insert_data) #logger.debug(f"Return from insertion: {insert_return}") logger.info("finished processing data") diff --git a/examples/sensor-logger_mainz.yaml b/examples/sensor-logger_mainz.yaml new file mode 100755 index 0000000..9421603 --- /dev/null +++ b/examples/sensor-logger_mainz.yaml @@ -0,0 +1,49 @@ +name: sensor_data_logger_mainz +module: Project8SensorLogger +dripline_mesh: + broker: rabbit-broker + broker_port: 5672 +# AlertConsumer Inits +alert_keys: + - "sensor_value.#" +alert_key_parser_re: 'sensor_value\.(?P\w+)' +# PostgreSQLInterface Inits +database_name: p8_sc_db +database_server: postgres +# SensorLogger Inits +sensor_type_map_table: id_map +data_tables_dict: + numeric: numeric_table + string: string_table + json: json_table +endpoints: + - name: id_map + module: SQLTable + table_name: endpoint_id_map + - name: numeric_table + module: SQLTable + table_name: numeric_data + required_insert_names: + - endpoint_name + - timestamp + - value_raw + optional_insert_names: + - value_cal + - name: string_table + module: SQLTable + table_name: string_data + required_insert_names: + - endpoint_name + - timestamp + - value_raw + optional_insert_names: + - value_cal + - name: json_table + module: SQLTable + table_name: json_data + required_insert_names: + - endpoint_name + - timestamp + - value_raw + optional_insert_names: + - value_cal From a766bdf3c76e80f8c1313690780b45853b411e68 Mon Sep 17 00:00:00 2001 From: Rene Reimann Date: Wed, 4 Feb 2026 22:39:27 +0100 Subject: [PATCH 4/6] fixing variable name --- dripline/extensions/project8_sensor_logger.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dripline/extensions/project8_sensor_logger.py b/dripline/extensions/project8_sensor_logger.py index 204e4ab..24b9cad 100644 --- a/dripline/extensions/project8_sensor_logger.py +++ b/dripline/extensions/project8_sensor_logger.py @@ -60,7 +60,7 @@ def process_payload(self, a_payload, a_routing_key_data, a_message_timestamp): insert_data = {'timestamp': a_message_timestamp} insert_data.update(a_routing_key_data) insert_data.update(a_payload.to_python()) - logger.info(f"Inserting {a_routing_key_data} in table {table_type}; data are:\n{insert_data}") + logger.info(f"Inserting {a_routing_key_data} in table {table_name}; data are:\n{insert_data}") # do the insert ### dry run for testing From 7ab87ac48db7ebb573f8e6d12659fd56c8d6eba6 Mon Sep 17 00:00:00 2001 From: Rene Reimann Date: Wed, 4 Feb 2026 22:44:19 +0100 Subject: [PATCH 5/6] adding an exception to check if the key is in the id_map or not. Give resonable error message --- dripline/extensions/project8_sensor_logger.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dripline/extensions/project8_sensor_logger.py b/dripline/extensions/project8_sensor_logger.py index 24b9cad..c6478ef 100644 --- a/dripline/extensions/project8_sensor_logger.py +++ b/dripline/extensions/project8_sensor_logger.py @@ -53,7 +53,10 @@ def process_payload(self, a_payload, a_routing_key_data, a_message_timestamp): # add safty check, and see if the key is contained in the table #if len(this_type) != 1: # raise Exception("not registered endpoint") - table_name = self._data_tables[this_type[1][0][0]] + try: + table_name = self._data_tables[this_type[1][0][0]] + except: + raise Exception(f"{a_routing_key_data} is not in database, see {this_type}") this_data_table = self.sync_children[table_name] # combine data sources From 06d4f2116589c0f24ba42d36ab9dc74cc0c0661c Mon Sep 17 00:00:00 2001 From: Rene Reimann Date: Fri, 6 Feb 2026 08:41:46 +0100 Subject: [PATCH 6/6] now remove dry mode and get it running, cleaning up comments from development --- dripline/extensions/project8_sensor_logger.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/dripline/extensions/project8_sensor_logger.py b/dripline/extensions/project8_sensor_logger.py index c6478ef..1eb31cd 100644 --- a/dripline/extensions/project8_sensor_logger.py +++ b/dripline/extensions/project8_sensor_logger.py @@ -50,9 +50,7 @@ def process_payload(self, a_payload, a_routing_key_data, a_message_timestamp): this_type = None this_type = self.sync_children[self._sensor_type_map_table].do_select(return_cols=["type"], where_eq_dict=a_routing_key_data) - # add safty check, and see if the key is contained in the table - #if len(this_type) != 1: - # raise Exception("not registered endpoint") + # add safty check, and see if the key is contained in the table otherwise generate meaningful error message try: table_name = self._data_tables[this_type[1][0][0]] except: @@ -66,9 +64,8 @@ def process_payload(self, a_payload, a_routing_key_data, a_message_timestamp): logger.info(f"Inserting {a_routing_key_data} in table {table_name}; data are:\n{insert_data}") # do the insert - ### dry run for testing - #insert_return = this_data_table.do_insert(**insert_data) - #logger.debug(f"Return from insertion: {insert_return}") + insert_return = this_data_table.do_insert(**insert_data) + logger.debug(f"Return from insertion: {insert_return}") logger.info("finished processing data") except sqlalchemy.exc.SQLAlchemyError as err: logger.critical(f'Received SQL error while doing insert: {err}')