diff --git a/backend/dataall/modules/metadata_forms/db/enums.py b/backend/dataall/modules/metadata_forms/db/enums.py
index 9b5903af8..12cc57ff5 100644
--- a/backend/dataall/modules/metadata_forms/db/enums.py
+++ b/backend/dataall/modules/metadata_forms/db/enums.py
@@ -65,3 +65,18 @@ class MetadataFormUserRoles(GraphQLEnumMapper):
MetadataFormEntityTypes.Bucket.value: MetadataFormEnforcementScope.Dataset,
MetadataFormEntityTypes.Share.value: MetadataFormEnforcementScope.Dataset,
}
+
+ENTITY_LINK_MAP = {
+ MetadataFormEntityTypes.Organization.value: 'organizations',
+ MetadataFormEntityTypes.Environment.value: 'environments',
+ MetadataFormEntityTypes.S3Dataset.value: 's3-datasets',
+ MetadataFormEntityTypes.RDDataset.value: 'redshift-datasets',
+ MetadataFormEntityTypes.Worksheet.value: 'worksheets',
+ MetadataFormEntityTypes.Dashboard.value: 'dashboards',
+ MetadataFormEntityTypes.Notebook.value: 'notebooks',
+ MetadataFormEntityTypes.MLStudioUser.value: 'mlstudio',
+ MetadataFormEntityTypes.Pipeline.value: 'pipelines',
+ MetadataFormEntityTypes.Table.value: 's3-datasets/table',
+ MetadataFormEntityTypes.Folder.value: 's3-datasets/folder',
+ MetadataFormEntityTypes.Share.value: 'shares',
+}
diff --git a/backend/dataall/modules/metadata_forms/handlers/metadata_form_handler.py b/backend/dataall/modules/metadata_forms/handlers/metadata_form_handler.py
index 510647e95..26dd0a7ef 100644
--- a/backend/dataall/modules/metadata_forms/handlers/metadata_form_handler.py
+++ b/backend/dataall/modules/metadata_forms/handlers/metadata_form_handler.py
@@ -9,5 +9,8 @@ class EcsMetadataFormHandler:
def notify_owners_of_enforcement(engine, task: Task):
with engine.scoped_session() as session:
MetadataFormEnforcementService.notify_owners_of_enforcement(
- session=session, rule_uri=task.targetUri, mf_name=task.payload.get('mf_name', 'Unknown name')
+ session=session,
+ rule_uri=task.targetUri,
+ mf_name=task.payload.get('mf_name', 'Unknown name'),
+ mf_uri=task.payload.get('mf_uri', 'Unknown uri')
)
diff --git a/backend/dataall/modules/metadata_forms/services/metadata_form_enforcement_service.py b/backend/dataall/modules/metadata_forms/services/metadata_form_enforcement_service.py
index 650a173cc..f3ab3aef0 100644
--- a/backend/dataall/modules/metadata_forms/services/metadata_form_enforcement_service.py
+++ b/backend/dataall/modules/metadata_forms/services/metadata_form_enforcement_service.py
@@ -1,3 +1,5 @@
+import os
+import logging
from typing import List
from dataall.base.context import get_context
from dataall.base.db import exceptions, Engine
@@ -15,6 +17,7 @@
MetadataFormEnforcementScope,
MetadataFormEnforcementSeverity,
ENTITY_SCOPE_BY_TYPE,
+ ENTITY_LINK_MAP,
)
from dataall.core.metadata_manager.metadata_form_entity_manager import (
MetadataFormEntityTypes,
@@ -31,6 +34,8 @@
)
from dataall.modules.notifications.db.notification_repositories import NotificationRepository
+log = logging.getLogger(__name__)
+
class MetadataFormEnforcementRequestValidationService:
@staticmethod
@@ -71,17 +76,36 @@ def _get_entity_uri(session, data):
return data.get('homeEntity')
@classmethod
- def notify_owners_of_enforcement(cls, session, rule_uri: str, mf_name: str) -> bool:
+ def notify_owners_of_enforcement(cls, session, rule_uri: str, mf_name: str, mf_uri: str) -> bool:
affected_entities = MetadataFormEnforcementService._get_affected_entities(session=session, uri=rule_uri)
for entity in affected_entities:
- if entity['owner']:
+ owner = entity.get('owner')
+
+ # Send in-app notification (popover)
+ if owner:
NotificationRepository.create_notification(
session,
- recipient=entity['owner'],
+ recipient=owner,
target_uri=f'{entity["uri"]}|{entity["type"]}',
message=f'Usage of metadata form "{mf_name}" was enforced for {entity["uri"]} {entity["type"]}',
notification_type='METADATA_FORM_ENFORCED',
)
+
+ # Send notification by email
+ try:
+ # skip if the owner is an individual user
+ if entity['attached'] is None and "@" not in owner:
+ MetadataFormEnforcementService.notify_entity_owner_by_email(
+ session=session,
+ mf_name=mf_name,
+ mf_uri=mf_uri,
+ entity=entity,
+ recipient_groups_list=[owner]
+ )
+ except Exception as e:
+ log.warning(f"Skipping invalid or missing owner group {owner}: {e}")
+ continue
+
return True
@staticmethod
@@ -98,7 +122,10 @@ def create_mf_enforcement_rule(uri, data):
task = Task(
targetUri=rule.uri,
action='metadata_form.enforcement.notify',
- payload={'mf_name': mf.name},
+ payload={
+ 'mf_name': mf.name,
+ 'mf_uri': mf.uri
+ },
)
session.add(task)
session.commit()
@@ -358,3 +385,64 @@ def get_rules_that_affect_entity(entity_type, entity_uri):
r.metadataFormName = MetadataFormRepository.get_metadata_form(session, r.metadataFormUri).name
return all_rules
+
+ @staticmethod
+ def notify_entity_owner_by_email(session, mf_name, mf_uri, entity, recipient_groups_list=None, recipient_email_ids=None):
+ """
+ Sends a one-time notification to entity owners when a new enforcement rule is created and their entities are affected.
+ """
+
+ try:
+ if recipient_groups_list is None:
+ recipient_groups_list = []
+ if recipient_email_ids is None:
+ recipient_email_ids = []
+
+ entity_type = entity["type"]
+ if entity_type in ENTITY_LINK_MAP:
+ entity_link = f'/console/{ENTITY_LINK_MAP[entity_type]}/{entity["uri"]}'
+ else:
+ entity_link = f'/console/metadata-forms/{mf_uri}'
+
+ entity_link_text = ''
+ if os.environ.get('frontend_domain_url'):
+ entity_link_text = (
+ f'
Please visit data.all link '
+ f'to attach the required metadata form: "{mf_name}".'
+ )
+
+ subject = (
+ f'ACTION REQUIRED: Data.all | Metadata form "{mf_name}" required for {entity["type"]} {entity["uri"]}'
+ )
+
+ msg_intro = f"""Dear User,
+ The metadata form "{mf_name}" is required for your resource: {entity["uri"]} ({entity["type"]}).
+ """ + entity_link_text
+
+ msg_end = """
Your prompt attention in this matter is greatly appreciated.
+
Best regards,
+
The Data.all Team
+ """
+
+ msg = msg_intro + msg_end
+
+ notification_task: Task = Task(
+ action='notification.service',
+ targetUri=entity["uri"],
+ payload={
+ 'notificationType': 'email',
+ 'subject': subject,
+ 'message': msg,
+ 'recipientGroupsList': recipient_groups_list,
+ 'recipientEmailList': recipient_email_ids,
+ },
+ )
+ session.add(notification_task)
+ session.commit()
+
+ Worker.queue(engine=get_context().db_engine, task_ids=[notification_task.taskUri])
+
+ except Exception as e:
+ err_msg = f"Failed to send notification email to affected entity owners: {e}"
+ log.exception(err_msg)
diff --git a/backend/dataall/modules/metadata_forms/tasks/__init__.py b/backend/dataall/modules/metadata_forms/tasks/__init__.py
new file mode 100644
index 000000000..da597f309
--- /dev/null
+++ b/backend/dataall/modules/metadata_forms/tasks/__init__.py
@@ -0,0 +1 @@
+"""Code of the long-running tasks that run in ECS"""
diff --git a/backend/dataall/modules/metadata_forms/tasks/metadata_form_enforcement_email_alerts_task.py b/backend/dataall/modules/metadata_forms/tasks/metadata_form_enforcement_email_alerts_task.py
new file mode 100644
index 000000000..bbd397c9f
--- /dev/null
+++ b/backend/dataall/modules/metadata_forms/tasks/metadata_form_enforcement_email_alerts_task.py
@@ -0,0 +1,166 @@
+import logging
+import os
+import sys
+import json
+from dataall.base.db import get_engine
+from collections import defaultdict
+from typing import Dict, DefaultDict
+from dataall.base.loader import load_modules, ImportMode
+from dataall.base.services.service_provider_factory import ServiceProviderFactory
+from dataall.modules.metadata_forms.db.metadata_form_repository import MetadataFormRepository
+from dataall.modules.metadata_forms.services.metadata_form_enforcement_service import MetadataFormEnforcementService
+from dataall.modules.notifications.services.ses_email_notification_service import SESEmailNotificationService
+from dataall.modules.metadata_forms.db.enums import ENTITY_LINK_MAP
+
+
+log = logging.getLogger(__name__)
+
+
+def _get_affected_entities_per_user(session) -> DefaultDict[str, Dict[str, Dict[str, str]]]:
+ """
+ Iterate over all metadata forms to fetch all enforcement rules,
+ and for each enforcement rule, identify affected entities without a metadata form attached.
+
+ Gather entity owner information, resolve group owners to individual users,
+ and prepare a mapping of users to the unique entities they own that are affected.
+
+ Returns:
+ A defaultdict where:
+ - key = user email id (str)
+ - value = dict mapping entity URI (str) to the entity object (dict with fields like name, type, owner, attached)
+ """
+
+ user_to_entities: DefaultDict[str, Dict[str, Dict[str, str]]] = defaultdict(dict)
+ identityProvider = ServiceProviderFactory.get_service_provider_instance()
+
+ # find all metadata forms
+ all_mfs = MetadataFormRepository.query_user_metadata_forms(session, is_da_admin=True, groups=None, env_uris=None, org_uris=None, filter=None).all()
+ log.info(f'Found {len(all_mfs)} metadata forms')
+
+ # for a given form, get all enforcement rules
+ for mf in all_mfs:
+ log.info(f'Processing metadata form {mf.uri}')
+ mf_enforcement_rules = MetadataFormRepository.list_mf_enforcement_rules(session, mf.uri)
+
+ log.info(f'Found {len(mf_enforcement_rules)} enforcement rules for metadata form {mf.uri}')
+ for rule in mf_enforcement_rules:
+ log.info(f'Processing enforcement rule {rule.uri}')
+
+ affected_entities = MetadataFormEnforcementService.get_affected_entities(uri=rule.uri, session=session)
+ log.info(f'Found {len(affected_entities)} affected entities')
+
+ for entity in affected_entities:
+ if entity['attached'] is None:
+ if entity['owner']:
+ owner = entity["owner"]
+ log.info(f'Fetching members from entity owner {owner}')
+
+ try:
+ user_email_ids = identityProvider.get_user_emailids_from_group(groupName=owner)
+ except Exception as e:
+ # We consider individual owner as invalid for now
+ log.warning(f"Skipping invalid or missing owner group {owner}: {e}")
+ continue
+
+ if user_email_ids:
+ for email_id in user_email_ids:
+ user_to_entities[email_id][entity['uri']] = entity
+
+ return user_to_entities
+
+
+def send_reminder_email(engine):
+ with engine.scoped_session() as session:
+ log.info('Running Metadata Form Enforcement Email Alert Task')
+
+ user_to_entities = _get_affected_entities_per_user(session)
+ for email_id, entities in user_to_entities.items():
+ email_body = _construct_email_body(entities.values())
+ log.debug(f'Sending email to user: {email_id} with email content: {email_body}')
+ subject = 'Action Required | Data.all metadata compliance digest'
+
+ try:
+ SESEmailNotificationService.send_email_task(
+ subject=subject, message=email_body, recipient_groups_list=[], recipient_email_list=[email_id]
+ )
+ except Exception as e:
+ err_msg = f'Failed to send email in weekly metadata form enforcement reminder task due to: {e}'
+ log.exception(err_msg)
+
+ log.info('Completed Metadata Form Enforcement Email Alert Task')
+
+
+def _construct_email_body(entities):
+
+ msg_heading = f"""Dear User,
+
+ The following resources that you own in Data.all are missing one or more required metadata forms.
+ Please review them and attach the necessary metadata forms to ensure compliance.
+ """
+
+ msg_content = _create_table_for_resource(entities)
+
+ msg_footer = """Your prompt attention in this matter is greatly appreciated.
+
Best regards,
+
The Data.all Team
+ """
+
+ return msg_heading + msg_content + msg_footer
+
+
+def _create_table_for_resource(entities):
+ table_heading = """
+