diff --git a/backend/dataall/modules/metadata_forms/db/enums.py b/backend/dataall/modules/metadata_forms/db/enums.py index 9b5903af8..12cc57ff5 100644 --- a/backend/dataall/modules/metadata_forms/db/enums.py +++ b/backend/dataall/modules/metadata_forms/db/enums.py @@ -65,3 +65,18 @@ class MetadataFormUserRoles(GraphQLEnumMapper): MetadataFormEntityTypes.Bucket.value: MetadataFormEnforcementScope.Dataset, MetadataFormEntityTypes.Share.value: MetadataFormEnforcementScope.Dataset, } + +ENTITY_LINK_MAP = { + MetadataFormEntityTypes.Organization.value: 'organizations', + MetadataFormEntityTypes.Environment.value: 'environments', + MetadataFormEntityTypes.S3Dataset.value: 's3-datasets', + MetadataFormEntityTypes.RDDataset.value: 'redshift-datasets', + MetadataFormEntityTypes.Worksheet.value: 'worksheets', + MetadataFormEntityTypes.Dashboard.value: 'dashboards', + MetadataFormEntityTypes.Notebook.value: 'notebooks', + MetadataFormEntityTypes.MLStudioUser.value: 'mlstudio', + MetadataFormEntityTypes.Pipeline.value: 'pipelines', + MetadataFormEntityTypes.Table.value: 's3-datasets/table', + MetadataFormEntityTypes.Folder.value: 's3-datasets/folder', + MetadataFormEntityTypes.Share.value: 'shares', +} diff --git a/backend/dataall/modules/metadata_forms/handlers/metadata_form_handler.py b/backend/dataall/modules/metadata_forms/handlers/metadata_form_handler.py index 510647e95..26dd0a7ef 100644 --- a/backend/dataall/modules/metadata_forms/handlers/metadata_form_handler.py +++ b/backend/dataall/modules/metadata_forms/handlers/metadata_form_handler.py @@ -9,5 +9,8 @@ class EcsMetadataFormHandler: def notify_owners_of_enforcement(engine, task: Task): with engine.scoped_session() as session: MetadataFormEnforcementService.notify_owners_of_enforcement( - session=session, rule_uri=task.targetUri, mf_name=task.payload.get('mf_name', 'Unknown name') + session=session, + rule_uri=task.targetUri, + mf_name=task.payload.get('mf_name', 'Unknown name'), + mf_uri=task.payload.get('mf_uri', 'Unknown uri') ) diff --git a/backend/dataall/modules/metadata_forms/services/metadata_form_enforcement_service.py b/backend/dataall/modules/metadata_forms/services/metadata_form_enforcement_service.py index 650a173cc..f3ab3aef0 100644 --- a/backend/dataall/modules/metadata_forms/services/metadata_form_enforcement_service.py +++ b/backend/dataall/modules/metadata_forms/services/metadata_form_enforcement_service.py @@ -1,3 +1,5 @@ +import os +import logging from typing import List from dataall.base.context import get_context from dataall.base.db import exceptions, Engine @@ -15,6 +17,7 @@ MetadataFormEnforcementScope, MetadataFormEnforcementSeverity, ENTITY_SCOPE_BY_TYPE, + ENTITY_LINK_MAP, ) from dataall.core.metadata_manager.metadata_form_entity_manager import ( MetadataFormEntityTypes, @@ -31,6 +34,8 @@ ) from dataall.modules.notifications.db.notification_repositories import NotificationRepository +log = logging.getLogger(__name__) + class MetadataFormEnforcementRequestValidationService: @staticmethod @@ -71,17 +76,36 @@ def _get_entity_uri(session, data): return data.get('homeEntity') @classmethod - def notify_owners_of_enforcement(cls, session, rule_uri: str, mf_name: str) -> bool: + def notify_owners_of_enforcement(cls, session, rule_uri: str, mf_name: str, mf_uri: str) -> bool: affected_entities = MetadataFormEnforcementService._get_affected_entities(session=session, uri=rule_uri) for entity in affected_entities: - if entity['owner']: + owner = entity.get('owner') + + # Send in-app notification (popover) + if owner: NotificationRepository.create_notification( session, - recipient=entity['owner'], + recipient=owner, target_uri=f'{entity["uri"]}|{entity["type"]}', message=f'Usage of metadata form "{mf_name}" was enforced for {entity["uri"]} {entity["type"]}', notification_type='METADATA_FORM_ENFORCED', ) + + # Send notification by email + try: + # skip if the owner is an individual user + if entity['attached'] is None and "@" not in owner: + MetadataFormEnforcementService.notify_entity_owner_by_email( + session=session, + mf_name=mf_name, + mf_uri=mf_uri, + entity=entity, + recipient_groups_list=[owner] + ) + except Exception as e: + log.warning(f"Skipping invalid or missing owner group {owner}: {e}") + continue + return True @staticmethod @@ -98,7 +122,10 @@ def create_mf_enforcement_rule(uri, data): task = Task( targetUri=rule.uri, action='metadata_form.enforcement.notify', - payload={'mf_name': mf.name}, + payload={ + 'mf_name': mf.name, + 'mf_uri': mf.uri + }, ) session.add(task) session.commit() @@ -358,3 +385,64 @@ def get_rules_that_affect_entity(entity_type, entity_uri): r.metadataFormName = MetadataFormRepository.get_metadata_form(session, r.metadataFormUri).name return all_rules + + @staticmethod + def notify_entity_owner_by_email(session, mf_name, mf_uri, entity, recipient_groups_list=None, recipient_email_ids=None): + """ + Sends a one-time notification to entity owners when a new enforcement rule is created and their entities are affected. + """ + + try: + if recipient_groups_list is None: + recipient_groups_list = [] + if recipient_email_ids is None: + recipient_email_ids = [] + + entity_type = entity["type"] + if entity_type in ENTITY_LINK_MAP: + entity_link = f'/console/{ENTITY_LINK_MAP[entity_type]}/{entity["uri"]}' + else: + entity_link = f'/console/metadata-forms/{mf_uri}' + + entity_link_text = '' + if os.environ.get('frontend_domain_url'): + entity_link_text = ( + f'

Please visit data.all link ' + f'to attach the required metadata form: "{mf_name}".' + ) + + subject = ( + f'ACTION REQUIRED: Data.all | Metadata form "{mf_name}" required for {entity["type"]} {entity["uri"]}' + ) + + msg_intro = f"""Dear User,

+ The metadata form "{mf_name}" is required for your resource: {entity["uri"]} ({entity["type"]}). + """ + entity_link_text + + msg_end = """

Your prompt attention in this matter is greatly appreciated. +

Best regards, +
The Data.all Team + """ + + msg = msg_intro + msg_end + + notification_task: Task = Task( + action='notification.service', + targetUri=entity["uri"], + payload={ + 'notificationType': 'email', + 'subject': subject, + 'message': msg, + 'recipientGroupsList': recipient_groups_list, + 'recipientEmailList': recipient_email_ids, + }, + ) + session.add(notification_task) + session.commit() + + Worker.queue(engine=get_context().db_engine, task_ids=[notification_task.taskUri]) + + except Exception as e: + err_msg = f"Failed to send notification email to affected entity owners: {e}" + log.exception(err_msg) diff --git a/backend/dataall/modules/metadata_forms/tasks/__init__.py b/backend/dataall/modules/metadata_forms/tasks/__init__.py new file mode 100644 index 000000000..da597f309 --- /dev/null +++ b/backend/dataall/modules/metadata_forms/tasks/__init__.py @@ -0,0 +1 @@ +"""Code of the long-running tasks that run in ECS""" diff --git a/backend/dataall/modules/metadata_forms/tasks/metadata_form_enforcement_email_alerts_task.py b/backend/dataall/modules/metadata_forms/tasks/metadata_form_enforcement_email_alerts_task.py new file mode 100644 index 000000000..bbd397c9f --- /dev/null +++ b/backend/dataall/modules/metadata_forms/tasks/metadata_form_enforcement_email_alerts_task.py @@ -0,0 +1,166 @@ +import logging +import os +import sys +import json +from dataall.base.db import get_engine +from collections import defaultdict +from typing import Dict, DefaultDict +from dataall.base.loader import load_modules, ImportMode +from dataall.base.services.service_provider_factory import ServiceProviderFactory +from dataall.modules.metadata_forms.db.metadata_form_repository import MetadataFormRepository +from dataall.modules.metadata_forms.services.metadata_form_enforcement_service import MetadataFormEnforcementService +from dataall.modules.notifications.services.ses_email_notification_service import SESEmailNotificationService +from dataall.modules.metadata_forms.db.enums import ENTITY_LINK_MAP + + +log = logging.getLogger(__name__) + + +def _get_affected_entities_per_user(session) -> DefaultDict[str, Dict[str, Dict[str, str]]]: + """ + Iterate over all metadata forms to fetch all enforcement rules, + and for each enforcement rule, identify affected entities without a metadata form attached. + + Gather entity owner information, resolve group owners to individual users, + and prepare a mapping of users to the unique entities they own that are affected. + + Returns: + A defaultdict where: + - key = user email id (str) + - value = dict mapping entity URI (str) to the entity object (dict with fields like name, type, owner, attached) + """ + + user_to_entities: DefaultDict[str, Dict[str, Dict[str, str]]] = defaultdict(dict) + identityProvider = ServiceProviderFactory.get_service_provider_instance() + + # find all metadata forms + all_mfs = MetadataFormRepository.query_user_metadata_forms(session, is_da_admin=True, groups=None, env_uris=None, org_uris=None, filter=None).all() + log.info(f'Found {len(all_mfs)} metadata forms') + + # for a given form, get all enforcement rules + for mf in all_mfs: + log.info(f'Processing metadata form {mf.uri}') + mf_enforcement_rules = MetadataFormRepository.list_mf_enforcement_rules(session, mf.uri) + + log.info(f'Found {len(mf_enforcement_rules)} enforcement rules for metadata form {mf.uri}') + for rule in mf_enforcement_rules: + log.info(f'Processing enforcement rule {rule.uri}') + + affected_entities = MetadataFormEnforcementService.get_affected_entities(uri=rule.uri, session=session) + log.info(f'Found {len(affected_entities)} affected entities') + + for entity in affected_entities: + if entity['attached'] is None: + if entity['owner']: + owner = entity["owner"] + log.info(f'Fetching members from entity owner {owner}') + + try: + user_email_ids = identityProvider.get_user_emailids_from_group(groupName=owner) + except Exception as e: + # We consider individual owner as invalid for now + log.warning(f"Skipping invalid or missing owner group {owner}: {e}") + continue + + if user_email_ids: + for email_id in user_email_ids: + user_to_entities[email_id][entity['uri']] = entity + + return user_to_entities + + +def send_reminder_email(engine): + with engine.scoped_session() as session: + log.info('Running Metadata Form Enforcement Email Alert Task') + + user_to_entities = _get_affected_entities_per_user(session) + for email_id, entities in user_to_entities.items(): + email_body = _construct_email_body(entities.values()) + log.debug(f'Sending email to user: {email_id} with email content: {email_body}') + subject = 'Action Required | Data.all metadata compliance digest' + + try: + SESEmailNotificationService.send_email_task( + subject=subject, message=email_body, recipient_groups_list=[], recipient_email_list=[email_id] + ) + except Exception as e: + err_msg = f'Failed to send email in weekly metadata form enforcement reminder task due to: {e}' + log.exception(err_msg) + + log.info('Completed Metadata Form Enforcement Email Alert Task') + + +def _construct_email_body(entities): + + msg_heading = f"""Dear User,

+ + The following resources that you own in Data.all are missing one or more required metadata forms. + Please review them and attach the necessary metadata forms to ensure compliance.

+ """ + + msg_content = _create_table_for_resource(entities) + + msg_footer = """Your prompt attention in this matter is greatly appreciated. +

Best regards, +
The Data.all Team + """ + + return msg_heading + msg_content + msg_footer + + +def _create_table_for_resource(entities): + table_heading = """ + + + Entity Type + + + Name + + + Link + + + """ + table_body = """""" + + for entity in entities: + entity_type = entity["type"] + + # get entity link + if entity_type in ENTITY_LINK_MAP: + entity_link = f'/console/{ENTITY_LINK_MAP[entity_type]}/{entity["uri"]}' + entity_link_text = f'View Resource' + else: + entity_link_text = "N/A" + + table_body += f""" + + + {entity['type']} + + + {entity['name']} + + + {entity_link_text} + + + """ + table = f""" + + {table_heading} + {table_body} +
+
+
+ """ + + return table + + +if __name__ == '__main__': + load_modules(modes={ImportMode.API}) + ENVNAME = os.environ.get('envname', 'local') + ENGINE = get_engine(envname=ENVNAME) + send_reminder_email(engine=ENGINE) diff --git a/deploy/stacks/container.py b/deploy/stacks/container.py index a2a042302..d71d27b4c 100644 --- a/deploy/stacks/container.py +++ b/deploy/stacks/container.py @@ -208,6 +208,7 @@ def __init__( self.add_omics_fetch_workflows_task() self.add_persistent_email_reminders_task() self.add_share_expiration_task() + self.add_metadata_form_enforcement_email_reminders_task() @run_if(['modules.s3_datasets.active', 'modules.dashboards.active']) def add_catalog_indexer_task(self): @@ -500,6 +501,34 @@ def add_share_expiration_task(self): ) self.ecs_task_definitions_families.append(scheduled_task.task_definition.family) + @run_if(['modules.s3_datasets.active', 'modules.metadata_forms.active']) + def add_metadata_form_enforcement_email_reminders_task(self): + self.add_custom_config_context(custom_auth=self._custom_auth) + mf_enforcement_email_reminders_task, mf_enforcement_email_reminders_task_def = self.set_scheduled_task( + cluster=self.ecs_cluster, + command=[ + 'python3.9', + '-m', + 'dataall.modules.metadata_forms.tasks.metadata_form_enforcement_email_alerts_task', + ], + container_id='container', + ecr_repository=self._ecr_repository, + environment=self.env_vars, + image_tag=self._cdkproxy_image_tag, + log_group=self.create_log_group( + self._envname, self._resource_prefix, log_group_name='metadata-form-email-reminders' + ), + schedule_expression=Schedule.expression('cron(0 15 ? * 2 *)'), # Run at 3PM UTC (10AM CST) every Monday + scheduled_task_id=f'{self._resource_prefix}-{self._envname}-metadata-form-email-reminders-schedule', + task_id=f'{self._resource_prefix}-{self._envname}-metadata-form-email-reminders', + task_role=self.task_role, + vpc=self._vpc, + security_group=self.scheduled_tasks_sg, + prod_sizing=self._prod_sizing, + ) + + self.ecs_task_definitions_families.append(mf_enforcement_email_reminders_task.task_definition.family) + def create_ecs_security_groups(self, envname, resource_prefix, vpc, vpce_connection, s3_prefix_list, lambdas): scheduled_tasks_sg = ec2.SecurityGroup( self,