Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions backend/dataall/modules/metadata_forms/db/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,18 @@ class MetadataFormUserRoles(GraphQLEnumMapper):
MetadataFormEntityTypes.Bucket.value: MetadataFormEnforcementScope.Dataset,
MetadataFormEntityTypes.Share.value: MetadataFormEnforcementScope.Dataset,
}

ENTITY_LINK_MAP = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any particular reason you need this exact new mapping? Why can't we reuse the existing one?

MetadataFormEntityTypes.Organization.value: 'organizations',
MetadataFormEntityTypes.Environment.value: 'environments',
MetadataFormEntityTypes.S3Dataset.value: 's3-datasets',
MetadataFormEntityTypes.RDDataset.value: 'redshift-datasets',
MetadataFormEntityTypes.Worksheet.value: 'worksheets',
MetadataFormEntityTypes.Dashboard.value: 'dashboards',
MetadataFormEntityTypes.Notebook.value: 'notebooks',
MetadataFormEntityTypes.MLStudioUser.value: 'mlstudio',
MetadataFormEntityTypes.Pipeline.value: 'pipelines',
MetadataFormEntityTypes.Table.value: 's3-datasets/table',
MetadataFormEntityTypes.Folder.value: 's3-datasets/folder',
MetadataFormEntityTypes.Share.value: 'shares',
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,8 @@ class EcsMetadataFormHandler:
def notify_owners_of_enforcement(engine, task: Task):
with engine.scoped_session() as session:
MetadataFormEnforcementService.notify_owners_of_enforcement(
session=session, rule_uri=task.targetUri, mf_name=task.payload.get('mf_name', 'Unknown name')
session=session,
rule_uri=task.targetUri,
mf_name=task.payload.get('mf_name', 'Unknown name'),
mf_uri=task.payload.get('mf_uri', 'Unknown uri')
)
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os
import logging
from typing import List
from dataall.base.context import get_context
from dataall.base.db import exceptions, Engine
Expand All @@ -15,6 +17,7 @@
MetadataFormEnforcementScope,
MetadataFormEnforcementSeverity,
ENTITY_SCOPE_BY_TYPE,
ENTITY_LINK_MAP,
)
from dataall.core.metadata_manager.metadata_form_entity_manager import (
MetadataFormEntityTypes,
Expand All @@ -31,6 +34,8 @@
)
from dataall.modules.notifications.db.notification_repositories import NotificationRepository

log = logging.getLogger(__name__)


class MetadataFormEnforcementRequestValidationService:
@staticmethod
Expand Down Expand Up @@ -71,17 +76,36 @@ def _get_entity_uri(session, data):
return data.get('homeEntity')

@classmethod
def notify_owners_of_enforcement(cls, session, rule_uri: str, mf_name: str) -> bool:
def notify_owners_of_enforcement(cls, session, rule_uri: str, mf_name: str, mf_uri: str) -> bool:
affected_entities = MetadataFormEnforcementService._get_affected_entities(session=session, uri=rule_uri)
for entity in affected_entities:
if entity['owner']:
owner = entity.get('owner')

# Send in-app notification (popover)
if owner:
NotificationRepository.create_notification(
session,
recipient=entity['owner'],
recipient=owner,
target_uri=f'{entity["uri"]}|{entity["type"]}',
message=f'Usage of metadata form "{mf_name}" was enforced for {entity["uri"]} {entity["type"]}',
notification_type='METADATA_FORM_ENFORCED',
)

# Send notification by email
try:
# skip if the owner is an individual user
if entity['attached'] is None and "@" not in owner:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this condition very specific? What 'individual user' means? May be we can put it in shared functions, so we could refer to it in other places as well?

MetadataFormEnforcementService.notify_entity_owner_by_email(
session=session,
mf_name=mf_name,
mf_uri=mf_uri,
entity=entity,
recipient_groups_list=[owner]
)
except Exception as e:
log.warning(f"Skipping invalid or missing owner group {owner}: {e}")
continue

return True

@staticmethod
Expand All @@ -98,7 +122,10 @@ def create_mf_enforcement_rule(uri, data):
task = Task(
targetUri=rule.uri,
action='metadata_form.enforcement.notify',
payload={'mf_name': mf.name},
payload={
'mf_name': mf.name,
'mf_uri': mf.uri
},
)
session.add(task)
session.commit()
Expand Down Expand Up @@ -358,3 +385,64 @@ def get_rules_that_affect_entity(entity_type, entity_uri):
r.metadataFormName = MetadataFormRepository.get_metadata_form(session, r.metadataFormUri).name

return all_rules

@staticmethod
def notify_entity_owner_by_email(session, mf_name, mf_uri, entity, recipient_groups_list=None, recipient_email_ids=None):
"""
Sends a one-time notification to entity owners when a new enforcement rule is created and their entities are affected.
"""

try:
if recipient_groups_list is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest

recipient_groups_list = recipient_groups_list or []
recipient_email_ids = recipient_email_ids or []

recipient_groups_list = []
if recipient_email_ids is None:
recipient_email_ids = []

entity_type = entity["type"]
if entity_type in ENTITY_LINK_MAP:
entity_link = f'/console/{ENTITY_LINK_MAP[entity_type]}/{entity["uri"]}'
else:
entity_link = f'/console/metadata-forms/{mf_uri}'

entity_link_text = ''
if os.environ.get('frontend_domain_url'):
entity_link_text = (
f'<br><br>Please visit data.all <a href="{os.environ.get("frontend_domain_url")}'
f'{entity_link}">link</a> '
f'to attach the required metadata form: "{mf_name}".'
)

subject = (
f'ACTION REQUIRED: Data.all | Metadata form "{mf_name}" required for {entity["type"]} {entity["uri"]}'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be we can add entity name here.
Smth like 'dataset TestName (uri)'

)

msg_intro = f"""Dear User, <br><br>
The metadata form "{mf_name}" is required for your resource: {entity["uri"]} ({entity["type"]}).
""" + entity_link_text

msg_end = """<br><br>Your prompt attention in this matter is greatly appreciated.
<br><br>Best regards,
<br>The Data.all Team
"""

msg = msg_intro + msg_end

notification_task: Task = Task(
action='notification.service',
targetUri=entity["uri"],
payload={
'notificationType': 'email',
'subject': subject,
'message': msg,
'recipientGroupsList': recipient_groups_list,
'recipientEmailList': recipient_email_ids,
},
)
session.add(notification_task)
session.commit()

Worker.queue(engine=get_context().db_engine, task_ids=[notification_task.taskUri])

except Exception as e:
err_msg = f"Failed to send notification email to affected entity owners: {e}"
log.exception(err_msg)
1 change: 1 addition & 0 deletions backend/dataall/modules/metadata_forms/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Code of the long-running tasks that run in ECS"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import logging
import os
import sys
import json
from dataall.base.db import get_engine
from collections import defaultdict
from typing import Dict, DefaultDict
from dataall.base.loader import load_modules, ImportMode
from dataall.base.services.service_provider_factory import ServiceProviderFactory
from dataall.modules.metadata_forms.db.metadata_form_repository import MetadataFormRepository
from dataall.modules.metadata_forms.services.metadata_form_enforcement_service import MetadataFormEnforcementService
from dataall.modules.notifications.services.ses_email_notification_service import SESEmailNotificationService
from dataall.modules.metadata_forms.db.enums import ENTITY_LINK_MAP


log = logging.getLogger(__name__)


def _get_affected_entities_per_user(session) -> DefaultDict[str, Dict[str, Dict[str, str]]]:
"""
Iterate over all metadata forms to fetch all enforcement rules,
and for each enforcement rule, identify affected entities without a metadata form attached.

Gather entity owner information, resolve group owners to individual users,
and prepare a mapping of users to the unique entities they own that are affected.

Returns:
A defaultdict where:
- key = user email id (str)
- value = dict mapping entity URI (str) to the entity object (dict with fields like name, type, owner, attached)
"""

user_to_entities: DefaultDict[str, Dict[str, Dict[str, str]]] = defaultdict(dict)
identityProvider = ServiceProviderFactory.get_service_provider_instance()

# find all metadata forms
all_mfs = MetadataFormRepository.query_user_metadata_forms(session, is_da_admin=True, groups=None, env_uris=None, org_uris=None, filter=None).all()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This task is executed in Ecs and not so time-sensitive. But for issue #1807 this will be timesensitive issue.

  1. I would place this function in mf_enforcement_service since it performs business logic and can be reused
  2. Maybe we should think about creating a view in DB, where all unattached entities will be stored. It can speed up such operations

log.info(f'Found {len(all_mfs)} metadata forms')

# for a given form, get all enforcement rules
for mf in all_mfs:
log.info(f'Processing metadata form {mf.uri}')
mf_enforcement_rules = MetadataFormRepository.list_mf_enforcement_rules(session, mf.uri)

log.info(f'Found {len(mf_enforcement_rules)} enforcement rules for metadata form {mf.uri}')
for rule in mf_enforcement_rules:
log.info(f'Processing enforcement rule {rule.uri}')

affected_entities = MetadataFormEnforcementService.get_affected_entities(uri=rule.uri, session=session)
log.info(f'Found {len(affected_entities)} affected entities')

for entity in affected_entities:
if entity['attached'] is None:
if entity['owner']:
owner = entity["owner"]
log.info(f'Fetching members from entity owner {owner}')

try:
user_email_ids = identityProvider.get_user_emailids_from_group(groupName=owner)
except Exception as e:
# We consider individual owner as invalid for now
log.warning(f"Skipping invalid or missing owner group {owner}: {e}")
continue

if user_email_ids:
for email_id in user_email_ids:
user_to_entities[email_id][entity['uri']] = entity

return user_to_entities


def send_reminder_email(engine):
with engine.scoped_session() as session:
log.info('Running Metadata Form Enforcement Email Alert Task')

user_to_entities = _get_affected_entities_per_user(session)
for email_id, entities in user_to_entities.items():
email_body = _construct_email_body(entities.values())
log.debug(f'Sending email to user: {email_id} with email content: {email_body}')
subject = 'Action Required | Data.all metadata compliance digest'

try:
SESEmailNotificationService.send_email_task(
subject=subject, message=email_body, recipient_groups_list=[], recipient_email_list=[email_id]
)
except Exception as e:
err_msg = f'Failed to send email in weekly metadata form enforcement reminder task due to: {e}'
log.exception(err_msg)

log.info('Completed Metadata Form Enforcement Email Alert Task')


def _construct_email_body(entities):

msg_heading = f"""Dear User, <br><br>

The following resources that you own in Data.all are missing one or more required metadata forms.
Please review them and attach the necessary metadata forms to ensure compliance.<br><br>
"""

msg_content = _create_table_for_resource(entities)

msg_footer = """Your prompt attention in this matter is greatly appreciated.
<br><br>Best regards,
<br>The Data.all Team
"""

return msg_heading + msg_content + msg_footer


def _create_table_for_resource(entities):
table_heading = """
<tr>
<th align='center'>
Entity Type
</th>
<th align='center'>
Name
</th>
<th align='center'>
Link
</th>
</tr>
"""
table_body = """"""

for entity in entities:
entity_type = entity["type"]

# get entity link
if entity_type in ENTITY_LINK_MAP:
entity_link = f'/console/{ENTITY_LINK_MAP[entity_type]}/{entity["uri"]}'
entity_link_text = f'<a href="{os.environ.get("frontend_domain_url", "") + entity_link}">View Resource</a>'
else:
entity_link_text = "N/A"

table_body += f"""
<tr>
<td align='center'>
{entity['type']}
</td>
<td align='center'>
{entity['name']}
</td>
<td align='center'>
{entity_link_text}
</td>
</tr>
"""
table = f"""
<table border='1' style='border-collapse:collapse; width: 70%;'>
{table_heading}
{table_body}
</table>
<br>
<br>
"""

return table


if __name__ == '__main__':
load_modules(modes={ImportMode.API})
ENVNAME = os.environ.get('envname', 'local')
ENGINE = get_engine(envname=ENVNAME)
send_reminder_email(engine=ENGINE)
29 changes: 29 additions & 0 deletions deploy/stacks/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ def __init__(
self.add_omics_fetch_workflows_task()
self.add_persistent_email_reminders_task()
self.add_share_expiration_task()
self.add_metadata_form_enforcement_email_reminders_task()

@run_if(['modules.s3_datasets.active', 'modules.dashboards.active'])
def add_catalog_indexer_task(self):
Expand Down Expand Up @@ -500,6 +501,34 @@ def add_share_expiration_task(self):
)
self.ecs_task_definitions_families.append(scheduled_task.task_definition.family)

@run_if(['modules.s3_datasets.active', 'modules.metadata_forms.active'])
def add_metadata_form_enforcement_email_reminders_task(self):
self.add_custom_config_context(custom_auth=self._custom_auth)
mf_enforcement_email_reminders_task, mf_enforcement_email_reminders_task_def = self.set_scheduled_task(
cluster=self.ecs_cluster,
command=[
'python3.9',
'-m',
'dataall.modules.metadata_forms.tasks.metadata_form_enforcement_email_alerts_task',
],
container_id='container',
ecr_repository=self._ecr_repository,
environment=self.env_vars,
image_tag=self._cdkproxy_image_tag,
log_group=self.create_log_group(
self._envname, self._resource_prefix, log_group_name='metadata-form-email-reminders'
),
schedule_expression=Schedule.expression('cron(0 15 ? * 2 *)'), # Run at 3PM UTC (10AM CST) every Monday
scheduled_task_id=f'{self._resource_prefix}-{self._envname}-metadata-form-email-reminders-schedule',
task_id=f'{self._resource_prefix}-{self._envname}-metadata-form-email-reminders',
task_role=self.task_role,
vpc=self._vpc,
security_group=self.scheduled_tasks_sg,
prod_sizing=self._prod_sizing,
)

self.ecs_task_definitions_families.append(mf_enforcement_email_reminders_task.task_definition.family)

def create_ecs_security_groups(self, envname, resource_prefix, vpc, vpce_connection, s3_prefix_list, lambdas):
scheduled_tasks_sg = ec2.SecurityGroup(
self,
Expand Down