Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dripline/extensions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@
from .ethernet_huber_service import *
from .ethernet_modbus_service import *
from .pfeiffer_endpoint import *
from .project8_sensor_logger import *
73 changes: 73 additions & 0 deletions dripline/extensions/project8_sensor_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
'''
A Postgres Interface-based logger
'''

from __future__ import absolute_import

# standard libs
import logging
import re

# 3rd party libs
import sqlalchemy

# internal imports
from dripline.core import AlertConsumer
from dripline.implementations.postgres_interface import PostgreSQLInterface

__all__ = []
logger = logging.getLogger(__name__)


__all__.append('Project8SensorLogger')
class Project8SensorLogger(AlertConsumer, PostgreSQLInterface):
'''
A custom sensor logger tailored to the project 8 database structure.
'''
def __init__(self, sensor_type_map_table, data_tables_dict={}, **kwargs):
'''
sensor_type_map_table (str): name of the child endpoint of this instance which provides access to the endpoint_id_map, which stores the sensor type
data_tables_dict (dict): dictionary mapping types (in the sensor_type_map_table) to child endpoints of this instance which provide access to the data_table for that type
'''
AlertConsumer.__init__(self, add_endpoints_now=False, **kwargs)
PostgreSQLInterface.__init__(self, **kwargs)

self._sensor_type_map_table = sensor_type_map_table
self._data_tables = data_tables_dict

self.connect_to_db(self.auth)

self.add_endpoints_from_config()

# add_endpoint is a mess here because of the diamond inheritance, so let's be explicit
def add_child(self, endpoint):
AlertConsumer.add_child(self, endpoint)
self.add_child_table(endpoint)

def process_payload(self, a_payload, a_routing_key_data, a_message_timestamp):
try:
# get the type and table for the sensor
this_type = None
this_type = self.sync_children[self._sensor_type_map_table].do_select(return_cols=["type"],
where_eq_dict=a_routing_key_data)
# add safty check, and see if the key is contained in the table otherwise generate meaningful error message
try:
table_name = self._data_tables[this_type[1][0][0]]
except:
raise Exception(f"{a_routing_key_data} is not in database, see {this_type}")
this_data_table = self.sync_children[table_name]

# combine data sources
insert_data = {'timestamp': a_message_timestamp}
insert_data.update(a_routing_key_data)
insert_data.update(a_payload.to_python())
logger.info(f"Inserting {a_routing_key_data} in table {table_name}; data are:\n{insert_data}")

# do the insert
insert_return = this_data_table.do_insert(**insert_data)
logger.debug(f"Return from insertion: {insert_return}")
logger.info("finished processing data")
except sqlalchemy.exc.SQLAlchemyError as err:
logger.critical(f'Received SQL error while doing insert: {err}')
except Exception as err:
logger.critical(f'An exception was raised while processing a payload to insert: {err}')
49 changes: 49 additions & 0 deletions examples/sensor-logger_mainz.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
name: sensor_data_logger_mainz
module: Project8SensorLogger
dripline_mesh:
broker: rabbit-broker
broker_port: 5672
# AlertConsumer Inits
alert_keys:
- "sensor_value.#"
alert_key_parser_re: 'sensor_value\.(?P<endpoint_name>\w+)'
# PostgreSQLInterface Inits
database_name: p8_sc_db
database_server: postgres
# SensorLogger Inits
sensor_type_map_table: id_map
data_tables_dict:
numeric: numeric_table
string: string_table
json: json_table
endpoints:
- name: id_map
module: SQLTable
table_name: endpoint_id_map
- name: numeric_table
module: SQLTable
table_name: numeric_data
required_insert_names:
- endpoint_name
- timestamp
- value_raw
optional_insert_names:
- value_cal
- name: string_table
module: SQLTable
table_name: string_data
required_insert_names:
- endpoint_name
- timestamp
- value_raw
optional_insert_names:
- value_cal
- name: json_table
module: SQLTable
table_name: json_data
required_insert_names:
- endpoint_name
- timestamp
- value_raw
optional_insert_names:
- value_cal