From 9dfd1775f938c628e4df8c77f925b641c8308f4b Mon Sep 17 00:00:00 2001 From: trajopadhye Date: Thu, 1 May 2025 17:21:53 -0500 Subject: [PATCH 1/3] Changed to logic in share reapplying --- .../attached_metadata_form_service.py | 1 + .../db/share_object_repositories.py | 12 +++ .../shares_base/handlers/ecs_share_handler.py | 4 +- .../shares_base/tasks/share_reapplier_task.py | 93 ++++++++++++------- .../shares_base/tasks/share_verifier_task.py | 19 +++- 5 files changed, 88 insertions(+), 41 deletions(-) diff --git a/backend/dataall/modules/metadata_forms/services/attached_metadata_form_service.py b/backend/dataall/modules/metadata_forms/services/attached_metadata_form_service.py index 91ae4a504..6af07e57c 100644 --- a/backend/dataall/modules/metadata_forms/services/attached_metadata_form_service.py +++ b/backend/dataall/modules/metadata_forms/services/attached_metadata_form_service.py @@ -6,6 +6,7 @@ from dataall.modules.metadata_forms.services.metadata_form_permissions import ATTACH_METADATA_FORM + class AttachedMetadataFormValidationService: @staticmethod def validate_filled_form_params(uri, data): diff --git a/backend/dataall/modules/shares_base/db/share_object_repositories.py b/backend/dataall/modules/shares_base/db/share_object_repositories.py index 1ec4abf78..23d3f6cee 100644 --- a/backend/dataall/modules/shares_base/db/share_object_repositories.py +++ b/backend/dataall/modules/shares_base/db/share_object_repositories.py @@ -196,6 +196,18 @@ def get_share_data_items_by_type(session, share, share_type_model, share_type_ur query = query.filter(ShareObjectItem.healthStatus == healthStatus) return query.all() + @staticmethod + def get_share_object_with_health_status(session, healthStatus=None): + query = ( + session.query(ShareObject) + .join( + ShareObject, + ShareObject.shareUri == ShareObjectItem.shareUri, + ) + .filter(ShareObjectItem.healthStatus == healthStatus) + ) + return query.all() + @staticmethod def get_all_share_items_in_share(session, share_uri, status=None, healthStatus=None): query = ( diff --git a/backend/dataall/modules/shares_base/handlers/ecs_share_handler.py b/backend/dataall/modules/shares_base/handlers/ecs_share_handler.py index a064d4c30..8af75116a 100644 --- a/backend/dataall/modules/shares_base/handlers/ecs_share_handler.py +++ b/backend/dataall/modules/shares_base/handlers/ecs_share_handler.py @@ -42,10 +42,10 @@ def cleanup_share(engine, task: Task): def reapply_shares_of_dataset(engine, task: Task): envname = os.environ.get('envname', 'local') if envname in ['local', 'dkrcompose']: - EcsBulkShareRepplyService.process_reapply_shares_for_dataset(engine, task.targetUri) + EcsBulkShareRepplyService.process_reapply_shares_for_dataset(engine, [task.targetUri]) else: context = [ - {'name': 'datasetUri', 'value': task.targetUri}, + {'name': 'datasetUris', 'value': [task.targetUri]}, ] return EcsShareHandler._run_share_management_ecs_task( task_definition_param_str='ecs/task_def_arn/share_reapplier', diff --git a/backend/dataall/modules/shares_base/tasks/share_reapplier_task.py b/backend/dataall/modules/shares_base/tasks/share_reapplier_task.py index 225f069bd..095010c53 100644 --- a/backend/dataall/modules/shares_base/tasks/share_reapplier_task.py +++ b/backend/dataall/modules/shares_base/tasks/share_reapplier_task.py @@ -1,6 +1,7 @@ import logging import os import sys +from typing import List from dataall.modules.shares_base.db.share_object_repositories import ShareObjectRepository from dataall.modules.shares_base.db.share_object_models import ShareObject @@ -16,26 +17,39 @@ class EcsBulkShareRepplyService: @classmethod - def process_reapply_shares_for_dataset(cls, engine, dataset_uri): + def reapply_for_share_objects(cls, engine, session, share_objects: List[str]): + share_object: ShareObject + processed_shares = [] + for share_object in share_objects: + log.info( + f'Re-applying Share Items for Share Object (Share URI: {share_object.shareUri} ) with Requestor: {share_object.principalId} on Target Dataset: {share_object.datasetUri}' + ) + processed_shares.append(share_object) + ShareStatusRepository.update_share_item_health_status_batch( + session=session, + share_uri=share_object.shareUri, + old_status=ShareItemHealthStatus.Unhealthy.value, + new_status=ShareItemHealthStatus.PendingReApply.value, + ) + SharingService.reapply_share(engine, share_uri=share_object.shareUri) + return processed_shares + + @classmethod + def process_reapply_shares_for_dataset(cls, engine, dataset_uris: List[str]): with engine.scoped_session() as session: processed_share_objects = [] - share_objects_for_dataset = ShareObjectRepository.list_active_share_object_for_dataset( - session=session, dataset_uri=dataset_uri - ) - log.info(f'Found {len(share_objects_for_dataset)} active share objects on dataset with uri: {dataset_uri}') - share_object: ShareObject - for share_object in share_objects_for_dataset: + log.info(f'Found {len(dataset_uris)} datasets for which shares have to be reapplied') + for dataset_uri in dataset_uris: + share_objects_for_dataset = ShareObjectRepository.list_active_share_object_for_dataset( + session=session, dataset_uri=dataset_uri + ) log.info( - f'Re-applying Share Items for Share Object (Share URI: {share_object.shareUri} ) with Requestor: {share_object.principalId} on Target Dataset: {share_object.datasetUri}' + f'Found {len(share_objects_for_dataset)} active share objects on dataset with uri: {dataset_uri}' ) - processed_share_objects.append(share_object.shareUri) - ShareStatusRepository.update_share_item_health_status_batch( - session=session, - share_uri=share_object.shareUri, - old_status=ShareItemHealthStatus.Unhealthy.value, - new_status=ShareItemHealthStatus.PendingReApply.value, + processed_shares = cls.reapply_for_share_objects( + engine, session, share_objects=share_objects_for_dataset ) - SharingService.reapply_share(engine, share_uri=share_object.shareUri) + processed_share_objects.extend(processed_shares) return processed_share_objects @classmethod @@ -43,39 +57,48 @@ def process_reapply_shares(cls, engine): with engine.scoped_session() as session: processed_share_objects = [] all_share_objects: [ShareObject] = ShareObjectRepository.list_all_active_share_objects(session) - log.info(f'Found {len(all_share_objects)} share objects ') - share_object: ShareObject - for share_object in all_share_objects: - log.info( - f'Re-applying Share Items for Share Object with Requestor: {share_object.principalId} on Target Dataset: {share_object.datasetUri}' - ) - processed_share_objects.append(share_object.shareUri) - ShareStatusRepository.update_share_item_health_status_batch( - session=session, - share_uri=share_object.shareUri, - old_status=ShareItemHealthStatus.Unhealthy.value, - new_status=ShareItemHealthStatus.PendingReApply.value, - ) - SharingService.reapply_share(engine, share_uri=share_object.shareUri) + log.info(f'Found {len(all_share_objects)} share objects for reapply') + processed_shares = cls.reapply_for_share_objects(engine, session, share_objects=all_share_objects) + processed_share_objects.extend(processed_shares) + return processed_share_objects + + @classmethod + def process_reapply_shares_for_share_uris(cls, engine, share_object_uris: List[str]): + with engine.scoped_session() as session: + share_objects: List[ShareObject] = [ + ShareObjectRepository.get_share_by_uri(session, share_uri) for share_uri in share_object_uris + ] + processed_share_objects = [] + log.info(f'{len(share_objects)} share objects to be reapplied') + processed_shares = cls.reapply_for_share_objects(engine, session, share_objects=share_objects) + processed_share_objects.extend(processed_shares) return processed_share_objects -def reapply_shares(engine, dataset_uri): +def reapply_shares(engine, dataset_uris: List[str], share_object_uris: List[str]): """ A method used by the scheduled ECS Task to re-apply_share() on all data.all active shares If dataset_uri is provided this ECS will reapply on all unhealthy shares belonging to a dataset else it will reapply on all data.all active unhealthy shares. """ - if dataset_uri: - return EcsBulkShareRepplyService.process_reapply_shares_for_dataset(engine, dataset_uri) + processed_shares = [] + if len(dataset_uris) > 0: + processed_shares.append(EcsBulkShareRepplyService.process_reapply_shares_for_dataset(engine, dataset_uris)) + if len(share_object_uris) > 0: + processed_shares.append( + EcsBulkShareRepplyService.process_reapply_shares_for_share_uris(engine, share_object_uris) + ) else: - return EcsBulkShareRepplyService.process_reapply_shares(engine) + processed_shares.append(EcsBulkShareRepplyService.process_reapply_shares(engine)) + + return processed_shares if __name__ == '__main__': load_modules(modes={ImportMode.SHARES_TASK}) ENVNAME = os.environ.get('envname', 'local') ENGINE = get_engine(envname=ENVNAME) - dataset_uri = os.environ.get('datasetUri', '') - processed_shares = reapply_shares(engine=ENGINE, dataset_uri=dataset_uri) + dataset_uris: List[str] = os.environ.get('datasetUris', []) + share_object_uris: List[str] = os.environ.get('shareUris', []) + processed_shares = reapply_shares(engine=ENGINE, dataset_uris=dataset_uris, share_object_uris=share_object_uris) log.info(f'Finished processing {len(processed_shares)} shares') diff --git a/backend/dataall/modules/shares_base/tasks/share_verifier_task.py b/backend/dataall/modules/shares_base/tasks/share_verifier_task.py index a731a5756..e1a44f3f6 100644 --- a/backend/dataall/modules/shares_base/tasks/share_verifier_task.py +++ b/backend/dataall/modules/shares_base/tasks/share_verifier_task.py @@ -1,9 +1,11 @@ import logging import os import sys +from typing import List + from dataall.modules.shares_base.db.share_object_repositories import ShareObjectRepository from dataall.modules.shares_base.db.share_object_models import ShareObject -from dataall.modules.shares_base.services.shares_enums import ShareItemStatus +from dataall.modules.shares_base.services.shares_enums import ShareItemStatus, ShareItemHealthStatus from dataall.modules.shares_base.services.sharing_service import SharingService from dataall.core.stacks.aws.ecs import Ecs @@ -35,11 +37,20 @@ def verify_shares(engine): return processed_share_objects -def trigger_reapply_task(): +def trigger_reapply_task(engine): + with engine.scoped_session as session: + unhealthy_share_objects: List[ShareObject] = ShareObjectRepository.get_share_object_with_health_status( + session, ShareItemHealthStatus.Unhealthy.value + ) + unhealthy_share_objects_uris: List[str] = [share_object.shareUri for share_object in unhealthy_share_objects] + + context = { + {'name': 'shareUris', 'value': unhealthy_share_objects_uris}, + } Ecs.run_ecs_task( task_definition_param='ecs/task_def_arn/share_reapplier', container_name_param='ecs/container/share_reapplier', - context=[], + context=context, ) @@ -49,4 +60,4 @@ def trigger_reapply_task(): ENGINE = get_engine(envname=ENVNAME) processed_shares = verify_shares(engine=ENGINE) log.info(f'Finished verifying {len(processed_shares)} shares, triggering reapply...') - trigger_reapply_task() + trigger_reapply_task(ENGINE) From 54b1f3f7df0e77628c44fb594ea682284ea608c4 Mon Sep 17 00:00:00 2001 From: trajopadhye Date: Thu, 1 May 2025 21:39:19 -0500 Subject: [PATCH 2/3] Corrections --- .../dataall/modules/shares_base/tasks/share_verifier_task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/dataall/modules/shares_base/tasks/share_verifier_task.py b/backend/dataall/modules/shares_base/tasks/share_verifier_task.py index e1a44f3f6..07cd60251 100644 --- a/backend/dataall/modules/shares_base/tasks/share_verifier_task.py +++ b/backend/dataall/modules/shares_base/tasks/share_verifier_task.py @@ -38,7 +38,7 @@ def verify_shares(engine): def trigger_reapply_task(engine): - with engine.scoped_session as session: + with engine.scoped_session() as session: unhealthy_share_objects: List[ShareObject] = ShareObjectRepository.get_share_object_with_health_status( session, ShareItemHealthStatus.Unhealthy.value ) From bb05712ba4fbe9c54ccabb8856b343c5bbe6adde Mon Sep 17 00:00:00 2001 From: trajopadhye Date: Thu, 1 May 2025 21:49:35 -0500 Subject: [PATCH 3/3] Linting corrections --- .../metadata_forms/services/attached_metadata_form_service.py | 1 - 1 file changed, 1 deletion(-) diff --git a/backend/dataall/modules/metadata_forms/services/attached_metadata_form_service.py b/backend/dataall/modules/metadata_forms/services/attached_metadata_form_service.py index 6af07e57c..91ae4a504 100644 --- a/backend/dataall/modules/metadata_forms/services/attached_metadata_form_service.py +++ b/backend/dataall/modules/metadata_forms/services/attached_metadata_form_service.py @@ -6,7 +6,6 @@ from dataall.modules.metadata_forms.services.metadata_form_permissions import ATTACH_METADATA_FORM - class AttachedMetadataFormValidationService: @staticmethod def validate_filled_form_params(uri, data):