-
Notifications
You must be signed in to change notification settings - Fork 92
GH-1810 metadata form enforcement email reminder #1838
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
1b6d5db
f8776c4
fec3a50
e1286fb
881e4d5
e7fb14c
cabdbdc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,5 @@ | ||
| import os | ||
| import logging | ||
| from typing import List | ||
| from dataall.base.context import get_context | ||
| from dataall.base.db import exceptions, Engine | ||
|
|
@@ -15,6 +17,7 @@ | |
| MetadataFormEnforcementScope, | ||
| MetadataFormEnforcementSeverity, | ||
| ENTITY_SCOPE_BY_TYPE, | ||
| ENTITY_LINK_MAP, | ||
| ) | ||
| from dataall.core.metadata_manager.metadata_form_entity_manager import ( | ||
| MetadataFormEntityTypes, | ||
|
|
@@ -31,6 +34,8 @@ | |
| ) | ||
| from dataall.modules.notifications.db.notification_repositories import NotificationRepository | ||
|
|
||
| log = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class MetadataFormEnforcementRequestValidationService: | ||
| @staticmethod | ||
|
|
@@ -71,17 +76,36 @@ def _get_entity_uri(session, data): | |
| return data.get('homeEntity') | ||
|
|
||
| @classmethod | ||
| def notify_owners_of_enforcement(cls, session, rule_uri: str, mf_name: str) -> bool: | ||
| def notify_owners_of_enforcement(cls, session, rule_uri: str, mf_name: str, mf_uri: str) -> bool: | ||
| affected_entities = MetadataFormEnforcementService._get_affected_entities(session=session, uri=rule_uri) | ||
| for entity in affected_entities: | ||
| if entity['owner']: | ||
| owner = entity.get('owner') | ||
|
|
||
| # Send in-app notification (popover) | ||
| if owner: | ||
| NotificationRepository.create_notification( | ||
| session, | ||
| recipient=entity['owner'], | ||
| recipient=owner, | ||
| target_uri=f'{entity["uri"]}|{entity["type"]}', | ||
| message=f'Usage of metadata form "{mf_name}" was enforced for {entity["uri"]} {entity["type"]}', | ||
| notification_type='METADATA_FORM_ENFORCED', | ||
| ) | ||
|
|
||
| # Send notification by email | ||
| try: | ||
| # skip if the owner is an individual user | ||
| if entity['attached'] is None and "@" not in owner: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't this condition very specific? What 'individual user' means? May be we can put it in shared functions, so we could refer to it in other places as well? |
||
| MetadataFormEnforcementService.notify_entity_owner_by_email( | ||
| session=session, | ||
| mf_name=mf_name, | ||
| mf_uri=mf_uri, | ||
| entity=entity, | ||
| recipient_groups_list=[owner] | ||
| ) | ||
| except Exception as e: | ||
| log.warning(f"Skipping invalid or missing owner group {owner}: {e}") | ||
| continue | ||
|
|
||
| return True | ||
|
|
||
| @staticmethod | ||
|
|
@@ -98,7 +122,10 @@ def create_mf_enforcement_rule(uri, data): | |
| task = Task( | ||
| targetUri=rule.uri, | ||
| action='metadata_form.enforcement.notify', | ||
| payload={'mf_name': mf.name}, | ||
| payload={ | ||
| 'mf_name': mf.name, | ||
| 'mf_uri': mf.uri | ||
| }, | ||
| ) | ||
| session.add(task) | ||
| session.commit() | ||
|
|
@@ -358,3 +385,64 @@ def get_rules_that_affect_entity(entity_type, entity_uri): | |
| r.metadataFormName = MetadataFormRepository.get_metadata_form(session, r.metadataFormUri).name | ||
|
|
||
| return all_rules | ||
|
|
||
| @staticmethod | ||
| def notify_entity_owner_by_email(session, mf_name, mf_uri, entity, recipient_groups_list=None, recipient_email_ids=None): | ||
| """ | ||
| Sends a one-time notification to entity owners when a new enforcement rule is created and their entities are affected. | ||
| """ | ||
|
|
||
| try: | ||
| if recipient_groups_list is None: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suggest |
||
| recipient_groups_list = [] | ||
| if recipient_email_ids is None: | ||
| recipient_email_ids = [] | ||
|
|
||
| entity_type = entity["type"] | ||
| if entity_type in ENTITY_LINK_MAP: | ||
| entity_link = f'/console/{ENTITY_LINK_MAP[entity_type]}/{entity["uri"]}' | ||
| else: | ||
| entity_link = f'/console/metadata-forms/{mf_uri}' | ||
|
|
||
| entity_link_text = '' | ||
| if os.environ.get('frontend_domain_url'): | ||
| entity_link_text = ( | ||
| f'<br><br>Please visit data.all <a href="{os.environ.get("frontend_domain_url")}' | ||
| f'{entity_link}">link</a> ' | ||
| f'to attach the required metadata form: "{mf_name}".' | ||
| ) | ||
|
|
||
| subject = ( | ||
| f'ACTION REQUIRED: Data.all | Metadata form "{mf_name}" required for {entity["type"]} {entity["uri"]}' | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. May be we can add entity name here. |
||
| ) | ||
|
|
||
| msg_intro = f"""Dear User, <br><br> | ||
| The metadata form "{mf_name}" is required for your resource: {entity["uri"]} ({entity["type"]}). | ||
| """ + entity_link_text | ||
|
|
||
| msg_end = """<br><br>Your prompt attention in this matter is greatly appreciated. | ||
| <br><br>Best regards, | ||
| <br>The Data.all Team | ||
| """ | ||
|
|
||
| msg = msg_intro + msg_end | ||
|
|
||
| notification_task: Task = Task( | ||
| action='notification.service', | ||
| targetUri=entity["uri"], | ||
| payload={ | ||
| 'notificationType': 'email', | ||
| 'subject': subject, | ||
| 'message': msg, | ||
| 'recipientGroupsList': recipient_groups_list, | ||
| 'recipientEmailList': recipient_email_ids, | ||
| }, | ||
| ) | ||
| session.add(notification_task) | ||
| session.commit() | ||
|
|
||
| Worker.queue(engine=get_context().db_engine, task_ids=[notification_task.taskUri]) | ||
|
|
||
| except Exception as e: | ||
| err_msg = f"Failed to send notification email to affected entity owners: {e}" | ||
| log.exception(err_msg) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| """Code of the long-running tasks that run in ECS""" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,166 @@ | ||
| import logging | ||
| import os | ||
| import sys | ||
| import json | ||
| from dataall.base.db import get_engine | ||
| from collections import defaultdict | ||
| from typing import Dict, DefaultDict | ||
| from dataall.base.loader import load_modules, ImportMode | ||
| from dataall.base.services.service_provider_factory import ServiceProviderFactory | ||
| from dataall.modules.metadata_forms.db.metadata_form_repository import MetadataFormRepository | ||
| from dataall.modules.metadata_forms.services.metadata_form_enforcement_service import MetadataFormEnforcementService | ||
| from dataall.modules.notifications.services.ses_email_notification_service import SESEmailNotificationService | ||
| from dataall.modules.metadata_forms.db.enums import ENTITY_LINK_MAP | ||
|
|
||
|
|
||
| log = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| def _get_affected_entities_per_user(session) -> DefaultDict[str, Dict[str, Dict[str, str]]]: | ||
| """ | ||
| Iterate over all metadata forms to fetch all enforcement rules, | ||
| and for each enforcement rule, identify affected entities without a metadata form attached. | ||
|
|
||
| Gather entity owner information, resolve group owners to individual users, | ||
| and prepare a mapping of users to the unique entities they own that are affected. | ||
|
|
||
| Returns: | ||
| A defaultdict where: | ||
| - key = user email id (str) | ||
| - value = dict mapping entity URI (str) to the entity object (dict with fields like name, type, owner, attached) | ||
| """ | ||
|
|
||
| user_to_entities: DefaultDict[str, Dict[str, Dict[str, str]]] = defaultdict(dict) | ||
| identityProvider = ServiceProviderFactory.get_service_provider_instance() | ||
|
|
||
| # find all metadata forms | ||
| all_mfs = MetadataFormRepository.query_user_metadata_forms(session, is_da_admin=True, groups=None, env_uris=None, org_uris=None, filter=None).all() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This task is executed in Ecs and not so time-sensitive. But for issue #1807 this will be timesensitive issue.
|
||
| log.info(f'Found {len(all_mfs)} metadata forms') | ||
|
|
||
| # for a given form, get all enforcement rules | ||
| for mf in all_mfs: | ||
| log.info(f'Processing metadata form {mf.uri}') | ||
| mf_enforcement_rules = MetadataFormRepository.list_mf_enforcement_rules(session, mf.uri) | ||
|
|
||
| log.info(f'Found {len(mf_enforcement_rules)} enforcement rules for metadata form {mf.uri}') | ||
| for rule in mf_enforcement_rules: | ||
| log.info(f'Processing enforcement rule {rule.uri}') | ||
|
|
||
| affected_entities = MetadataFormEnforcementService.get_affected_entities(uri=rule.uri, session=session) | ||
| log.info(f'Found {len(affected_entities)} affected entities') | ||
|
|
||
| for entity in affected_entities: | ||
| if entity['attached'] is None: | ||
| if entity['owner']: | ||
| owner = entity["owner"] | ||
| log.info(f'Fetching members from entity owner {owner}') | ||
|
|
||
| try: | ||
| user_email_ids = identityProvider.get_user_emailids_from_group(groupName=owner) | ||
| except Exception as e: | ||
| # We consider individual owner as invalid for now | ||
| log.warning(f"Skipping invalid or missing owner group {owner}: {e}") | ||
| continue | ||
|
|
||
| if user_email_ids: | ||
| for email_id in user_email_ids: | ||
| user_to_entities[email_id][entity['uri']] = entity | ||
|
|
||
| return user_to_entities | ||
|
|
||
|
|
||
| def send_reminder_email(engine): | ||
| with engine.scoped_session() as session: | ||
| log.info('Running Metadata Form Enforcement Email Alert Task') | ||
|
|
||
| user_to_entities = _get_affected_entities_per_user(session) | ||
| for email_id, entities in user_to_entities.items(): | ||
| email_body = _construct_email_body(entities.values()) | ||
| log.debug(f'Sending email to user: {email_id} with email content: {email_body}') | ||
| subject = 'Action Required | Data.all metadata compliance digest' | ||
|
|
||
| try: | ||
| SESEmailNotificationService.send_email_task( | ||
| subject=subject, message=email_body, recipient_groups_list=[], recipient_email_list=[email_id] | ||
| ) | ||
| except Exception as e: | ||
| err_msg = f'Failed to send email in weekly metadata form enforcement reminder task due to: {e}' | ||
| log.exception(err_msg) | ||
|
|
||
| log.info('Completed Metadata Form Enforcement Email Alert Task') | ||
|
|
||
|
|
||
| def _construct_email_body(entities): | ||
|
|
||
| msg_heading = f"""Dear User, <br><br> | ||
|
|
||
| The following resources that you own in Data.all are missing one or more required metadata forms. | ||
| Please review them and attach the necessary metadata forms to ensure compliance.<br><br> | ||
| """ | ||
|
|
||
| msg_content = _create_table_for_resource(entities) | ||
|
|
||
| msg_footer = """Your prompt attention in this matter is greatly appreciated. | ||
| <br><br>Best regards, | ||
| <br>The Data.all Team | ||
| """ | ||
|
|
||
| return msg_heading + msg_content + msg_footer | ||
|
|
||
|
|
||
| def _create_table_for_resource(entities): | ||
| table_heading = """ | ||
| <tr> | ||
| <th align='center'> | ||
| Entity Type | ||
| </th> | ||
| <th align='center'> | ||
| Name | ||
| </th> | ||
| <th align='center'> | ||
| Link | ||
| </th> | ||
| </tr> | ||
| """ | ||
| table_body = """""" | ||
|
|
||
| for entity in entities: | ||
| entity_type = entity["type"] | ||
|
|
||
| # get entity link | ||
| if entity_type in ENTITY_LINK_MAP: | ||
| entity_link = f'/console/{ENTITY_LINK_MAP[entity_type]}/{entity["uri"]}' | ||
| entity_link_text = f'<a href="{os.environ.get("frontend_domain_url", "") + entity_link}">View Resource</a>' | ||
| else: | ||
| entity_link_text = "N/A" | ||
|
|
||
| table_body += f""" | ||
| <tr> | ||
| <td align='center'> | ||
| {entity['type']} | ||
| </td> | ||
| <td align='center'> | ||
| {entity['name']} | ||
| </td> | ||
| <td align='center'> | ||
| {entity_link_text} | ||
| </td> | ||
| </tr> | ||
| """ | ||
| table = f""" | ||
| <table border='1' style='border-collapse:collapse; width: 70%;'> | ||
| {table_heading} | ||
| {table_body} | ||
| </table> | ||
| <br> | ||
| <br> | ||
| """ | ||
|
|
||
| return table | ||
|
|
||
|
|
||
| if __name__ == '__main__': | ||
| load_modules(modes={ImportMode.API}) | ||
| ENVNAME = os.environ.get('envname', 'local') | ||
| ENGINE = get_engine(envname=ENVNAME) | ||
| send_reminder_email(engine=ENGINE) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any particular reason you need this exact new mapping? Why can't we reuse the existing one?