From 1b6d5db92e061a5fb91dbf95b6382f46ab155973 Mon Sep 17 00:00:00 2001 From: Linyao Li Date: Mon, 7 Jul 2025 11:48:40 -0500 Subject: [PATCH 1/6] Add ECS task for mf enforcement digest email notification --- .../modules/metadata_forms/tasks/__init__.py | 1 + ...data_form_enforcement_email_alerts_task.py | 159 ++++++++++++++++++ deploy/stacks/container.py | 29 ++++ 3 files changed, 189 insertions(+) create mode 100644 backend/dataall/modules/metadata_forms/tasks/__init__.py create mode 100644 backend/dataall/modules/metadata_forms/tasks/metadata_form_enforcement_email_alerts_task.py diff --git a/backend/dataall/modules/metadata_forms/tasks/__init__.py b/backend/dataall/modules/metadata_forms/tasks/__init__.py new file mode 100644 index 000000000..da597f309 --- /dev/null +++ b/backend/dataall/modules/metadata_forms/tasks/__init__.py @@ -0,0 +1 @@ +"""Code of the long-running tasks that run in ECS""" diff --git a/backend/dataall/modules/metadata_forms/tasks/metadata_form_enforcement_email_alerts_task.py b/backend/dataall/modules/metadata_forms/tasks/metadata_form_enforcement_email_alerts_task.py new file mode 100644 index 000000000..f79cc9f01 --- /dev/null +++ b/backend/dataall/modules/metadata_forms/tasks/metadata_form_enforcement_email_alerts_task.py @@ -0,0 +1,159 @@ +import logging +import os +import sys +import json +from dataall.base.db import get_engine +from collections import defaultdict +from typing import Dict, DefaultDict +from dataall.base.loader import load_modules, ImportMode +from dataall.base.services.service_provider_factory import ServiceProviderFactory +from dataall.modules.metadata_forms.db.metadata_form_repository import MetadataFormRepository +from dataall.modules.metadata_forms.services.metadata_form_enforcement_service import MetadataFormEnforcementService +from dataall.modules.notifications.services.ses_email_notification_service import SESEmailNotificationService +from dataall.modules.metadata_forms.db.enums import ENTITY_LINK_MAP + + +log = logging.getLogger(__name__) + + +def _get_affected_entities_per_user(session) -> DefaultDict[str, Dict[str, Dict[str, str]]]: + """ + Iterate over all metadata forms to fetch all enforcement rules, + and for each enforcement rule, identify affected entities without a metadata form attached. + + Gather entity owner information, resolve group owners to individual users, + and prepare a mapping of users to the unique entities they own that are affected. + + Returns: + A defaultdict where: + - key = user email id (str) + - value = dict mapping entity URI (str) to the entity object (dict with fields like name, type, owner, attached) + """ + + user_to_entities: DefaultDict[str, Dict[str, Dict[str, str]]] = defaultdict(dict) + identityProvider = ServiceProviderFactory.get_service_provider_instance() + + # find all metadata forms + all_mfs = MetadataFormRepository.query_user_metadata_forms(session, is_da_admin=True, groups=None, env_uris=None, org_uris=None, filter=None).all() + log.info(f'Found {len(all_mfs)} metadata forms') + + # for a given form, get all enforcement rules + for mf in all_mfs: + log.info(f'Processing metadata form {mf.uri}') + mf_enforcement_rules = MetadataFormRepository.list_mf_enforcement_rules(session, mf.uri) + + log.info(f'Found {len(mf_enforcement_rules)} enforcement rules for metadata form {mf.uri}') + for rule in mf_enforcement_rules: + log.info(f'Processing enforcement rule {rule.uri}') + + affected_entities = MetadataFormEnforcementService.get_affected_entities(uri=rule.uri, session=session) + log.info(f'Found {len(affected_entities)} affected entities') + + for entity in affected_entities: + if entity['attached'] is None: + if entity['owner']: + owner_group = entity["owner"] + log.info(f'Fetching members from entity owner group {owner_group}') + + user_email_ids = identityProvider.get_user_emailids_from_group(groupName=owner_group) + for email_id in user_email_ids: + user_to_entities[email_id][entity['uri']] = entity + + return user_to_entities + + +def send_reminder_email(engine): + with engine.scoped_session() as session: + log.info('Running Metadata Form Enforcement Email Alert Task') + + user_to_entities = _get_affected_entities_per_user(session) + for email_id, entities in user_to_entities.items(): + email_body = _construct_email_body(entities.values()) + log.debug(f'Sending email to user: {email_id} with email content: {email_body}') + subject = 'Action Required | Data.all metadata compliance digest' + + try: + SESEmailNotificationService.send_email_task( + subject=subject, message=email_body, recipient_groups_list=[], recipient_email_list=[email_id] + ) + except Exception as e: + err_msg = f'Failed to send email in weekly metadata form enforcement reminder task due to: {e}' + log.exception(err_msg) + + log.info('Completed Metadata Form Enforcement Email Alert Task') + + +def _construct_email_body(entities): + + msg_heading = f"""Dear User,

+ + The following resources that you own in Data.all are missing one or more required metadata forms. + Please review them and attach the necessary metadata forms to ensure compliance.

+ """ + + msg_content = _create_table_for_resource(entities) + + msg_footer = """Your prompt attention in this matter is greatly appreciated. +

Best regards, +
The Data.all Team + """ + + return msg_heading + msg_content + msg_footer + + +def _create_table_for_resource(entities): + table_heading = """ + + + Entity Type + + + Name + + + Link + + + """ + table_body = """""" + + for entity in entities: + entity_type = entity["type"] + + # get entity link + if entity_type in ENTITY_LINK_MAP: + entity_link = f'/console/{ENTITY_LINK_MAP[entity_type]}/{entity["uri"]}' + entity_link_text = f'View Resource' + else: + entity_link_text = "N/A" + + table_body += f""" + + + {entity['type']} + + + {entity['name']} + + + {entity_link_text} + + + """ + table = f""" + + {table_heading} + {table_body} +
+
+
+ """ + + return table + + +if __name__ == '__main__': + load_modules(modes={ImportMode.API}) + ENVNAME = os.environ.get('envname', 'local') + ENGINE = get_engine(envname=ENVNAME) + send_reminder_email(engine=ENGINE) diff --git a/deploy/stacks/container.py b/deploy/stacks/container.py index a2a042302..d71d27b4c 100644 --- a/deploy/stacks/container.py +++ b/deploy/stacks/container.py @@ -208,6 +208,7 @@ def __init__( self.add_omics_fetch_workflows_task() self.add_persistent_email_reminders_task() self.add_share_expiration_task() + self.add_metadata_form_enforcement_email_reminders_task() @run_if(['modules.s3_datasets.active', 'modules.dashboards.active']) def add_catalog_indexer_task(self): @@ -500,6 +501,34 @@ def add_share_expiration_task(self): ) self.ecs_task_definitions_families.append(scheduled_task.task_definition.family) + @run_if(['modules.s3_datasets.active', 'modules.metadata_forms.active']) + def add_metadata_form_enforcement_email_reminders_task(self): + self.add_custom_config_context(custom_auth=self._custom_auth) + mf_enforcement_email_reminders_task, mf_enforcement_email_reminders_task_def = self.set_scheduled_task( + cluster=self.ecs_cluster, + command=[ + 'python3.9', + '-m', + 'dataall.modules.metadata_forms.tasks.metadata_form_enforcement_email_alerts_task', + ], + container_id='container', + ecr_repository=self._ecr_repository, + environment=self.env_vars, + image_tag=self._cdkproxy_image_tag, + log_group=self.create_log_group( + self._envname, self._resource_prefix, log_group_name='metadata-form-email-reminders' + ), + schedule_expression=Schedule.expression('cron(0 15 ? * 2 *)'), # Run at 3PM UTC (10AM CST) every Monday + scheduled_task_id=f'{self._resource_prefix}-{self._envname}-metadata-form-email-reminders-schedule', + task_id=f'{self._resource_prefix}-{self._envname}-metadata-form-email-reminders', + task_role=self.task_role, + vpc=self._vpc, + security_group=self.scheduled_tasks_sg, + prod_sizing=self._prod_sizing, + ) + + self.ecs_task_definitions_families.append(mf_enforcement_email_reminders_task.task_definition.family) + def create_ecs_security_groups(self, envname, resource_prefix, vpc, vpce_connection, s3_prefix_list, lambdas): scheduled_tasks_sg = ec2.SecurityGroup( self, From f8776c4f2ff65594b6459541865bb3b80b0762ce Mon Sep 17 00:00:00 2001 From: Linyao Li Date: Mon, 7 Jul 2025 12:18:58 -0500 Subject: [PATCH 2/6] Add one-time notification when enforcement rule is created --- .../modules/metadata_forms/db/enums.py | 14 ++ .../metadata_form_enforcement_service.py | 159 ++++++++++++------ 2 files changed, 124 insertions(+), 49 deletions(-) diff --git a/backend/dataall/modules/metadata_forms/db/enums.py b/backend/dataall/modules/metadata_forms/db/enums.py index 9b5903af8..569db5b47 100644 --- a/backend/dataall/modules/metadata_forms/db/enums.py +++ b/backend/dataall/modules/metadata_forms/db/enums.py @@ -65,3 +65,17 @@ class MetadataFormUserRoles(GraphQLEnumMapper): MetadataFormEntityTypes.Bucket.value: MetadataFormEnforcementScope.Dataset, MetadataFormEntityTypes.Share.value: MetadataFormEnforcementScope.Dataset, } + +ENTITY_LINK_MAP = { + MetadataFormEntityTypes.Organization.value: 'organizations', + MetadataFormEntityTypes.Environment.value: 'environments', + MetadataFormEntityTypes.S3Dataset.value: 's3-datasets', + MetadataFormEntityTypes.Worksheet.value: 'worksheets', + MetadataFormEntityTypes.Dashboard.value: 'dashboards', + MetadataFormEntityTypes.Notebook.value: 'notebooks', + MetadataFormEntityTypes.MLStudioUser.value: 'mlstudio', + MetadataFormEntityTypes.Pipeline.value: 'pipelines', + MetadataFormEntityTypes.Table.value: 's3-datasets/table', + MetadataFormEntityTypes.Folder.value: 's3-datasets/folder', + MetadataFormEntityTypes.Share.value: 'shares', +} diff --git a/backend/dataall/modules/metadata_forms/services/metadata_form_enforcement_service.py b/backend/dataall/modules/metadata_forms/services/metadata_form_enforcement_service.py index cda4e77f4..9b3e9f40e 100644 --- a/backend/dataall/modules/metadata_forms/services/metadata_form_enforcement_service.py +++ b/backend/dataall/modules/metadata_forms/services/metadata_form_enforcement_service.py @@ -1,4 +1,6 @@ -from typing import List +import os +import logging +from contextlib import nullcontext from dataall.base.context import get_context from dataall.base.db import exceptions from dataall.base.db.paginator import paginate_list @@ -13,6 +15,7 @@ MetadataFormEnforcementScope, MetadataFormEnforcementSeverity, ENTITY_SCOPE_BY_TYPE, + ENTITY_LINK_MAP, ) from dataall.core.metadata_manager.metadata_form_entity_manager import ( MetadataFormEntityTypes, @@ -28,6 +31,9 @@ ENFORCE_METADATA_FORM, ) from dataall.modules.notifications.db.notification_repositories import NotificationRepository +from dataall.modules.notifications.services.ses_email_notification_service import SESEmailNotificationService + +log = logging.getLogger(__name__) class MetadataFormEnforcementRequestValidationService: @@ -90,52 +96,55 @@ def create_mf_enforcement_rule(uri, data): notification_type='METADATA_FORM_ENFORCED', ) + MetadataFormEnforcementService.notify_affected_entity_owner(mf=mf, entity=entity, + recipient_groups_list=[entity['owner']]) + return rule @staticmethod - def get_affected_organizations(uri, rule=None) -> List[Organization]: - with get_context().db_engine.scoped_session() as session: - if not rule: - rule = MetadataFormRepository.get_mf_enforcement_rule_by_uri(session, uri) - if rule.level == MetadataFormEnforcementScope.Global.value: - return OrganizationRepository.query_all_active_organizations(session) - if rule.level == MetadataFormEnforcementScope.Organization.value: - return [OrganizationRepository.get_organization_by_uri(session, rule.homeEntity)] - return [] + def get_affected_organizations(session, uri, rule=None) -> List[Organization]: + if not rule: + rule = MetadataFormRepository.get_mf_enforcement_rule_by_uri(session, uri) + if rule.level == MetadataFormEnforcementScope.Global.value: + return OrganizationRepository.query_all_active_organizations(session) + if rule.level == MetadataFormEnforcementScope.Organization.value: + return [OrganizationRepository.get_organization_by_uri(session, rule.homeEntity)] + return [] @staticmethod - def get_affected_environments(uri, rule=None) -> List[Environment]: - with get_context().db_engine.scoped_session() as session: - if not rule: - rule = MetadataFormRepository.get_mf_enforcement_rule_by_uri(session, uri) - if rule.level == MetadataFormEnforcementScope.Global.value: - return EnvironmentRepository.query_all_active_environments(session) - if rule.level == MetadataFormEnforcementScope.Organization.value: - return OrganizationRepository.query_organization_environments( - session, uri=rule.homeEntity, filter=None - ).all() - if rule.level == MetadataFormEnforcementScope.Environment.value: - return [EnvironmentRepository.get_environment_by_uri(session, rule.homeEntity)] - return [] + def get_affected_environments(session, uri, rule=None) -> List[Environment]: + if not rule: + rule = MetadataFormRepository.get_mf_enforcement_rule_by_uri(session, uri) + if rule.level == MetadataFormEnforcementScope.Global.value: + return EnvironmentRepository.query_all_active_environments(session) + if rule.level == MetadataFormEnforcementScope.Organization.value: + return OrganizationRepository.query_organization_environments( + session, uri=rule.homeEntity, filter=None + ).all() + if rule.level == MetadataFormEnforcementScope.Environment.value: + return [EnvironmentRepository.get_environment_by_uri(session, rule.homeEntity)] + return [] @staticmethod - def get_affected_datasets(uri, rule=None) -> List[DatasetBase]: - with get_context().db_engine.scoped_session() as session: - if not rule: - rule = MetadataFormRepository.get_mf_enforcement_rule_by_uri(session, uri) - if rule.level == MetadataFormEnforcementScope.Global.value: - return DatasetListRepository.query_datasets(session).all() - if rule.level == MetadataFormEnforcementScope.Organization.value: - return DatasetListRepository.query_datasets(session, organizationUri=rule.homeEntity).all() - if rule.level == MetadataFormEnforcementScope.Environment.value: - return DatasetListRepository.query_datasets(session, environmentUri=rule.homeEntity).all() - if rule.level == MetadataFormEnforcementScope.Dataset.value: - return [DatasetBaseRepository.get_dataset_by_uri(session, rule.homeEntity)] - return [] + def get_affected_datasets(session, uri, rule=None) -> List[DatasetBase]: + if not rule: + rule = MetadataFormRepository.get_mf_enforcement_rule_by_uri(session, uri) + if rule.level == MetadataFormEnforcementScope.Global.value: + return DatasetListRepository.query_datasets(session).all() + if rule.level == MetadataFormEnforcementScope.Organization.value: + return DatasetListRepository.query_datasets(session, organizationUri=rule.homeEntity).all() + if rule.level == MetadataFormEnforcementScope.Environment.value: + return DatasetListRepository.query_datasets(session, environmentUri=rule.homeEntity).all() + if rule.level == MetadataFormEnforcementScope.Dataset.value: + return [DatasetBaseRepository.get_dataset_by_uri(session, rule.homeEntity)] + return [] @staticmethod - def get_attachement_for_rule(rule, entityUri) -> AttachedMetadataForm: - with get_context().db_engine.scoped_session() as session: + def get_attachement_for_rule(rule, entityUri, session=None) -> AttachedMetadataForm: + # Pass in a session object if it exists; otherwise, get it from request context + session_context = nullcontext(session) if session else get_context().db_engine.scoped_session() + + with session_context as session: return MetadataFormRepository.query_all_attached_metadata_forms_for_entity( session, entityUri=entityUri, @@ -144,39 +153,43 @@ def get_attachement_for_rule(rule, entityUri) -> AttachedMetadataForm: ).first() @staticmethod - def form_affected_entity_object(type, entity: MetadataFormEntity, rule): + def form_affected_entity_object(type, entity: MetadataFormEntity, rule, session=None): return { 'type': type, 'name': entity.entity_name(), 'uri': entity.uri(), 'owner': entity.owner_name(), - 'attached': MetadataFormEnforcementService.get_attachement_for_rule(rule, entity.uri()), + 'attached': MetadataFormEnforcementService.get_attachement_for_rule(rule, entity.uri(), session), } @staticmethod - def get_affected_entities(uri, rule=None): + def get_affected_entities(uri, rule=None, session=None): affected_entities = [] - with get_context().db_engine.scoped_session() as session: + + # Pass in a session object if it exists; otherwise, get it from request context + session_context = nullcontext(session) if session else get_context().db_engine.scoped_session() + + with session_context as session: if not rule: rule = MetadataFormRepository.get_mf_enforcement_rule_by_uri(session, uri) - orgs = MetadataFormEnforcementService.get_affected_organizations(uri, rule) + orgs = MetadataFormEnforcementService.get_affected_organizations(session, uri, rule) if MetadataFormEntityTypes.Organization.value in rule.entityTypes: affected_entities.extend( [ MetadataFormEnforcementService.form_affected_entity_object( - MetadataFormEntityTypes.Organization.value, o, rule + MetadataFormEntityTypes.Organization.value, o, rule, session ) for o in orgs ] ) - envs = MetadataFormEnforcementService.get_affected_environments(uri, rule) + envs = MetadataFormEnforcementService.get_affected_environments(session, uri, rule) if MetadataFormEntityTypes.Environment.value in rule.entityTypes: affected_entities.extend( [ MetadataFormEnforcementService.form_affected_entity_object( - MetadataFormEntityTypes.Environment.value, e, rule + MetadataFormEntityTypes.Environment.value, e, rule, session ) for e in envs ] @@ -186,11 +199,11 @@ def get_affected_entities(uri, rule=None): if MetadataFormEntityManager.is_registered( MetadataFormEntityTypes.S3Dataset.value ) or MetadataFormEntityManager.is_registered(MetadataFormEntityTypes.RDDataset.value): - datasets = MetadataFormEnforcementService.get_affected_datasets(uri, rule) + datasets = MetadataFormEnforcementService.get_affected_datasets(session, uri, rule) affected_entities.extend( [ MetadataFormEnforcementService.form_affected_entity_object( - ds.datasetType.value + '-Dataset', ds, rule + ds.datasetType.value + '-Dataset', ds, rule, session ) for ds in datasets if ds.datasetType.value + '-Dataset' in rule.entityTypes @@ -222,7 +235,7 @@ def get_affected_entities(uri, rule=None): all_entities = all_entities.all() affected_entities.extend( [ - MetadataFormEnforcementService.form_affected_entity_object(entity_type, e, rule) + MetadataFormEnforcementService.form_affected_entity_object(entity_type, e, rule, session) for e in all_entities ] ) @@ -344,3 +357,51 @@ def get_rules_that_affect_entity(entity_type, entity_uri): r.metadataFormName = MetadataFormRepository.get_metadata_form(session, r.metadataFormUri).name return all_rules + + @staticmethod + def notify_affected_entity_owner(mf, entity, recipient_groups_list=None, recipient_email_ids=None): + """ + Sends a one-time notification to entity owners when a new enforcement rule is created and their entities are affected. + """ + + try: + if recipient_groups_list is None: + recipient_groups_list = [] + if recipient_email_ids is None: + recipient_email_ids = [] + + entity_type = entity["type"] + if entity_type in ENTITY_LINK_MAP: + entity_link = f'/console/{ENTITY_LINK_MAP[entity_type]}/{entity["uri"]}' + else: + entity_link = f'/console/metadata-forms/{mf["uri"]}' + + entity_link_text = '' + if os.environ.get('frontend_domain_url'): + entity_link_text = ( + f'

Please visit data.all link ' + f'to attach the required metadata form: "{mf.name}. ' + ) + + subject = ( + f'ACTION REQUIRED: Data.all | Metadata form "{mf.name}" required for {entity["uri"]}' + ) + + msg_intro = f"""Dear User,

+ The metadata form "{mf.name}" is required for your resource: {entity["uri"]} ({entity["type"]}). + """ + entity_link_text + + msg_end = """

Your prompt attention in this matter is greatly appreciated. +

Best regards, +
The Data.all Team + """ + + msg = msg_intro + msg_end + + SESEmailNotificationService.send_email_task( + subject, msg, recipient_groups_list, recipient_email_ids + ) + except Exception as e: + err_msg = f"Failed to send notification email to affected entity owners: {e}" + log.exception(err_msg) From fec3a50ff3886689c0504158260360997b72f94a Mon Sep 17 00:00:00 2001 From: Linyao Li Date: Mon, 7 Jul 2025 15:04:10 -0500 Subject: [PATCH 3/6] Send email notifications in async way and fix error handling --- .../metadata_form_enforcement_service.py | 35 +++++++++++++++---- ...data_form_enforcement_email_alerts_task.py | 19 ++++++---- 2 files changed, 42 insertions(+), 12 deletions(-) diff --git a/backend/dataall/modules/metadata_forms/services/metadata_form_enforcement_service.py b/backend/dataall/modules/metadata_forms/services/metadata_form_enforcement_service.py index 9b3e9f40e..8f76ca168 100644 --- a/backend/dataall/modules/metadata_forms/services/metadata_form_enforcement_service.py +++ b/backend/dataall/modules/metadata_forms/services/metadata_form_enforcement_service.py @@ -3,6 +3,8 @@ from contextlib import nullcontext from dataall.base.context import get_context from dataall.base.db import exceptions +from dataall.core.tasks.db.task_models import Task +from dataall.core.tasks.service_handlers import Worker from dataall.base.db.paginator import paginate_list from dataall.core.environment.db.environment_repositories import EnvironmentRepository from dataall.core.environment.db.environment_models import Environment @@ -96,8 +98,16 @@ def create_mf_enforcement_rule(uri, data): notification_type='METADATA_FORM_ENFORCED', ) - MetadataFormEnforcementService.notify_affected_entity_owner(mf=mf, entity=entity, - recipient_groups_list=[entity['owner']]) + try: + owner = entity['owner'] + # skip if the owner is an individual user + if entity['attached'] is None and "@" not in owner: + MetadataFormEnforcementService.notify_affected_entity_owner(session=session, mf=mf, + entity=entity, + recipient_groups_list=[owner]) + except Exception as e: + log.warning(f"Skipping invalid or missing owner group {owner}: {e}") + continue return rule @@ -359,7 +369,7 @@ def get_rules_that_affect_entity(entity_type, entity_uri): return all_rules @staticmethod - def notify_affected_entity_owner(mf, entity, recipient_groups_list=None, recipient_email_ids=None): + def notify_affected_entity_owner(session, mf, entity, recipient_groups_list=None, recipient_email_ids=None): """ Sends a one-time notification to entity owners when a new enforcement rule is created and their entities are affected. """ @@ -381,7 +391,7 @@ def notify_affected_entity_owner(mf, entity, recipient_groups_list=None, recipie entity_link_text = ( f'

Please visit data.all link ' - f'to attach the required metadata form: "{mf.name}. ' + f'to attach the required metadata form: "{mf.name}".' ) subject = ( @@ -399,9 +409,22 @@ def notify_affected_entity_owner(mf, entity, recipient_groups_list=None, recipie msg = msg_intro + msg_end - SESEmailNotificationService.send_email_task( - subject, msg, recipient_groups_list, recipient_email_ids + notification_task: Task = Task( + action='notification.service', + targetUri=entity["uri"], + payload={ + 'notificationType': 'email', + 'subject': subject, + 'message': msg, + 'recipientGroupsList': recipient_groups_list, + 'recipientEmailList': recipient_email_ids, + }, ) + session.add(notification_task) + session.commit() + + Worker.queue(engine=get_context().db_engine, task_ids=[notification_task.taskUri]) + except Exception as e: err_msg = f"Failed to send notification email to affected entity owners: {e}" log.exception(err_msg) diff --git a/backend/dataall/modules/metadata_forms/tasks/metadata_form_enforcement_email_alerts_task.py b/backend/dataall/modules/metadata_forms/tasks/metadata_form_enforcement_email_alerts_task.py index f79cc9f01..bbd397c9f 100644 --- a/backend/dataall/modules/metadata_forms/tasks/metadata_form_enforcement_email_alerts_task.py +++ b/backend/dataall/modules/metadata_forms/tasks/metadata_form_enforcement_email_alerts_task.py @@ -52,12 +52,19 @@ def _get_affected_entities_per_user(session) -> DefaultDict[str, Dict[str, Dict[ for entity in affected_entities: if entity['attached'] is None: if entity['owner']: - owner_group = entity["owner"] - log.info(f'Fetching members from entity owner group {owner_group}') - - user_email_ids = identityProvider.get_user_emailids_from_group(groupName=owner_group) - for email_id in user_email_ids: - user_to_entities[email_id][entity['uri']] = entity + owner = entity["owner"] + log.info(f'Fetching members from entity owner {owner}') + + try: + user_email_ids = identityProvider.get_user_emailids_from_group(groupName=owner) + except Exception as e: + # We consider individual owner as invalid for now + log.warning(f"Skipping invalid or missing owner group {owner}: {e}") + continue + + if user_email_ids: + for email_id in user_email_ids: + user_to_entities[email_id][entity['uri']] = entity return user_to_entities From e1286fb41ec6e82b575a7b21a8e8722a58e1f8e9 Mon Sep 17 00:00:00 2001 From: Linyao Li Date: Mon, 7 Jul 2025 15:04:40 -0500 Subject: [PATCH 4/6] Add support for rdd links --- backend/dataall/modules/metadata_forms/db/enums.py | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/dataall/modules/metadata_forms/db/enums.py b/backend/dataall/modules/metadata_forms/db/enums.py index 569db5b47..12cc57ff5 100644 --- a/backend/dataall/modules/metadata_forms/db/enums.py +++ b/backend/dataall/modules/metadata_forms/db/enums.py @@ -70,6 +70,7 @@ class MetadataFormUserRoles(GraphQLEnumMapper): MetadataFormEntityTypes.Organization.value: 'organizations', MetadataFormEntityTypes.Environment.value: 'environments', MetadataFormEntityTypes.S3Dataset.value: 's3-datasets', + MetadataFormEntityTypes.RDDataset.value: 'redshift-datasets', MetadataFormEntityTypes.Worksheet.value: 'worksheets', MetadataFormEntityTypes.Dashboard.value: 'dashboards', MetadataFormEntityTypes.Notebook.value: 'notebooks', From e7fb14c82c45484844684fe8ded244950afd91e7 Mon Sep 17 00:00:00 2001 From: Linyao Li Date: Tue, 8 Jul 2025 09:25:56 -0500 Subject: [PATCH 5/6] Send one time email notification in the async function --- .../handlers/metadata_form_handler.py | 5 ++- .../metadata_form_enforcement_service.py | 42 ++++++++++++++----- 2 files changed, 35 insertions(+), 12 deletions(-) diff --git a/backend/dataall/modules/metadata_forms/handlers/metadata_form_handler.py b/backend/dataall/modules/metadata_forms/handlers/metadata_form_handler.py index 510647e95..26dd0a7ef 100644 --- a/backend/dataall/modules/metadata_forms/handlers/metadata_form_handler.py +++ b/backend/dataall/modules/metadata_forms/handlers/metadata_form_handler.py @@ -9,5 +9,8 @@ class EcsMetadataFormHandler: def notify_owners_of_enforcement(engine, task: Task): with engine.scoped_session() as session: MetadataFormEnforcementService.notify_owners_of_enforcement( - session=session, rule_uri=task.targetUri, mf_name=task.payload.get('mf_name', 'Unknown name') + session=session, + rule_uri=task.targetUri, + mf_name=task.payload.get('mf_name', 'Unknown name'), + mf_uri=task.payload.get('mf_uri', 'Unknown uri') ) diff --git a/backend/dataall/modules/metadata_forms/services/metadata_form_enforcement_service.py b/backend/dataall/modules/metadata_forms/services/metadata_form_enforcement_service.py index fb9bea27d..d632459af 100644 --- a/backend/dataall/modules/metadata_forms/services/metadata_form_enforcement_service.py +++ b/backend/dataall/modules/metadata_forms/services/metadata_form_enforcement_service.py @@ -1,7 +1,6 @@ import os import logging from typing import List -from contextlib import nullcontext from dataall.base.context import get_context from dataall.base.db import exceptions, Engine from dataall.base.db.paginator import paginate_list @@ -34,7 +33,6 @@ ENFORCE_METADATA_FORM, ) from dataall.modules.notifications.db.notification_repositories import NotificationRepository -from dataall.modules.notifications.services.ses_email_notification_service import SESEmailNotificationService log = logging.getLogger(__name__) @@ -78,17 +76,36 @@ def _get_entity_uri(session, data): return data.get('homeEntity') @classmethod - def notify_owners_of_enforcement(cls, session, rule_uri: str, mf_name: str) -> bool: + def notify_owners_of_enforcement(cls, session, rule_uri: str, mf_name: str, mf_uri: str) -> bool: affected_entities = MetadataFormEnforcementService._get_affected_entities(session=session, uri=rule_uri) for entity in affected_entities: - if entity['owner']: + owner = entity.get('owner') + + # Send in-app notification (popover) + if owner: NotificationRepository.create_notification( session, - recipient=entity['owner'], + recipient=owner, target_uri=f'{entity["uri"]}|{entity["type"]}', message=f'Usage of metadata form "{mf_name}" was enforced for {entity["uri"]} {entity["type"]}', notification_type='METADATA_FORM_ENFORCED', ) + + # Send notification by email + try: + # skip if the owner is an individual user + if entity['attached'] is None and "@" not in owner: + MetadataFormEnforcementService.notify_entity_owner_by_email( + session=session, + mf_name=mf_name, + mf_uri=mf_uri, + entity=entity, + recipient_groups_list=[owner] + ) + except Exception as e: + log.warning(f"Skipping invalid or missing owner group {owner}: {e}") + continue + return True @staticmethod @@ -105,7 +122,10 @@ def create_mf_enforcement_rule(uri, data): task = Task( targetUri=rule.uri, action='metadata_form.enforcement.notify', - payload={'mf_name': mf.name}, + payload={ + 'mf_name': mf.name, + 'mf_uri': mf.uri + }, ) session.add(task) session.commit() @@ -367,7 +387,7 @@ def get_rules_that_affect_entity(entity_type, entity_uri): return all_rules @staticmethod - def notify_affected_entity_owner(session, mf, entity, recipient_groups_list=None, recipient_email_ids=None): + def notify_entity_owner_by_email(session, mf_name, mf_uri, entity, recipient_groups_list=None, recipient_email_ids=None): """ Sends a one-time notification to entity owners when a new enforcement rule is created and their entities are affected. """ @@ -382,22 +402,22 @@ def notify_affected_entity_owner(session, mf, entity, recipient_groups_list=None if entity_type in ENTITY_LINK_MAP: entity_link = f'/console/{ENTITY_LINK_MAP[entity_type]}/{entity["uri"]}' else: - entity_link = f'/console/metadata-forms/{mf["uri"]}' + entity_link = f'/console/metadata-forms/{mf_uri}' entity_link_text = '' if os.environ.get('frontend_domain_url'): entity_link_text = ( f'

Please visit data.all link ' - f'to attach the required metadata form: "{mf.name}".' + f'to attach the required metadata form: "{mf_name}".' ) subject = ( - f'ACTION REQUIRED: Data.all | Metadata form "{mf.name}" required for {entity["uri"]}' + f'ACTION REQUIRED: Data.all | Metadata form "{mf_name}" required for {entity["uri"]}' ) msg_intro = f"""Dear User,

- The metadata form "{mf.name}" is required for your resource: {entity["uri"]} ({entity["type"]}). + The metadata form "{mf_name}" is required for your resource: {entity["uri"]} ({entity["type"]}). """ + entity_link_text msg_end = """

Your prompt attention in this matter is greatly appreciated. From cabdbdc3d12fee0c0fce6aa0b417ca916121a0ad Mon Sep 17 00:00:00 2001 From: Linyao Li Date: Wed, 9 Jul 2025 11:16:12 -0500 Subject: [PATCH 6/6] Modify email template --- .../services/metadata_form_enforcement_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/dataall/modules/metadata_forms/services/metadata_form_enforcement_service.py b/backend/dataall/modules/metadata_forms/services/metadata_form_enforcement_service.py index d632459af..f3ab3aef0 100644 --- a/backend/dataall/modules/metadata_forms/services/metadata_form_enforcement_service.py +++ b/backend/dataall/modules/metadata_forms/services/metadata_form_enforcement_service.py @@ -413,7 +413,7 @@ def notify_entity_owner_by_email(session, mf_name, mf_uri, entity, recipient_gro ) subject = ( - f'ACTION REQUIRED: Data.all | Metadata form "{mf_name}" required for {entity["uri"]}' + f'ACTION REQUIRED: Data.all | Metadata form "{mf_name}" required for {entity["type"]} {entity["uri"]}' ) msg_intro = f"""Dear User,