Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,18 @@ def get_share_data_items_by_type(session, share, share_type_model, share_type_ur
query = query.filter(ShareObjectItem.healthStatus == healthStatus)
return query.all()

@staticmethod
def get_share_object_with_health_status(session, healthStatus=None):
query = (
session.query(ShareObject)
.join(
ShareObject,
ShareObject.shareUri == ShareObjectItem.shareUri,
)
.filter(ShareObjectItem.healthStatus == healthStatus)
)
return query.all()

@staticmethod
def get_all_share_items_in_share(session, share_uri, status=None, healthStatus=None):
query = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ def cleanup_share(engine, task: Task):
def reapply_shares_of_dataset(engine, task: Task):
envname = os.environ.get('envname', 'local')
if envname in ['local', 'dkrcompose']:
EcsBulkShareRepplyService.process_reapply_shares_for_dataset(engine, task.targetUri)
EcsBulkShareRepplyService.process_reapply_shares_for_dataset(engine, [task.targetUri])
else:
context = [
{'name': 'datasetUri', 'value': task.targetUri},
{'name': 'datasetUris', 'value': [task.targetUri]},
]
return EcsShareHandler._run_share_management_ecs_task(
task_definition_param_str='ecs/task_def_arn/share_reapplier',
Expand Down
93 changes: 58 additions & 35 deletions backend/dataall/modules/shares_base/tasks/share_reapplier_task.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
import sys
from typing import List

from dataall.modules.shares_base.db.share_object_repositories import ShareObjectRepository
from dataall.modules.shares_base.db.share_object_models import ShareObject
Expand All @@ -16,66 +17,88 @@

class EcsBulkShareRepplyService:
@classmethod
def process_reapply_shares_for_dataset(cls, engine, dataset_uri):
def reapply_for_share_objects(cls, engine, session, share_objects: List[str]):
share_object: ShareObject
processed_shares = []
for share_object in share_objects:
log.info(
f'Re-applying Share Items for Share Object (Share URI: {share_object.shareUri} ) with Requestor: {share_object.principalId} on Target Dataset: {share_object.datasetUri}'
)
processed_shares.append(share_object)
ShareStatusRepository.update_share_item_health_status_batch(
session=session,
share_uri=share_object.shareUri,
old_status=ShareItemHealthStatus.Unhealthy.value,
new_status=ShareItemHealthStatus.PendingReApply.value,
)
SharingService.reapply_share(engine, share_uri=share_object.shareUri)
return processed_shares

@classmethod
def process_reapply_shares_for_dataset(cls, engine, dataset_uris: List[str]):
with engine.scoped_session() as session:
processed_share_objects = []
share_objects_for_dataset = ShareObjectRepository.list_active_share_object_for_dataset(
session=session, dataset_uri=dataset_uri
)
log.info(f'Found {len(share_objects_for_dataset)} active share objects on dataset with uri: {dataset_uri}')
share_object: ShareObject
for share_object in share_objects_for_dataset:
log.info(f'Found {len(dataset_uris)} datasets for which shares have to be reapplied')
for dataset_uri in dataset_uris:
share_objects_for_dataset = ShareObjectRepository.list_active_share_object_for_dataset(
session=session, dataset_uri=dataset_uri
)
log.info(
f'Re-applying Share Items for Share Object (Share URI: {share_object.shareUri} ) with Requestor: {share_object.principalId} on Target Dataset: {share_object.datasetUri}'
f'Found {len(share_objects_for_dataset)} active share objects on dataset with uri: {dataset_uri}'
)
processed_share_objects.append(share_object.shareUri)
ShareStatusRepository.update_share_item_health_status_batch(
session=session,
share_uri=share_object.shareUri,
old_status=ShareItemHealthStatus.Unhealthy.value,
new_status=ShareItemHealthStatus.PendingReApply.value,
processed_shares = cls.reapply_for_share_objects(
engine, session, share_objects=share_objects_for_dataset
)
SharingService.reapply_share(engine, share_uri=share_object.shareUri)
processed_share_objects.extend(processed_shares)
return processed_share_objects

@classmethod
def process_reapply_shares(cls, engine):
with engine.scoped_session() as session:
processed_share_objects = []
all_share_objects: [ShareObject] = ShareObjectRepository.list_all_active_share_objects(session)
log.info(f'Found {len(all_share_objects)} share objects ')
share_object: ShareObject
for share_object in all_share_objects:
log.info(
f'Re-applying Share Items for Share Object with Requestor: {share_object.principalId} on Target Dataset: {share_object.datasetUri}'
)
processed_share_objects.append(share_object.shareUri)
ShareStatusRepository.update_share_item_health_status_batch(
session=session,
share_uri=share_object.shareUri,
old_status=ShareItemHealthStatus.Unhealthy.value,
new_status=ShareItemHealthStatus.PendingReApply.value,
)
SharingService.reapply_share(engine, share_uri=share_object.shareUri)
log.info(f'Found {len(all_share_objects)} share objects for reapply')
processed_shares = cls.reapply_for_share_objects(engine, session, share_objects=all_share_objects)
processed_share_objects.extend(processed_shares)
return processed_share_objects

@classmethod
def process_reapply_shares_for_share_uris(cls, engine, share_object_uris: List[str]):
with engine.scoped_session() as session:
share_objects: List[ShareObject] = [
ShareObjectRepository.get_share_by_uri(session, share_uri) for share_uri in share_object_uris
]
processed_share_objects = []
log.info(f'{len(share_objects)} share objects to be reapplied')
processed_shares = cls.reapply_for_share_objects(engine, session, share_objects=share_objects)
processed_share_objects.extend(processed_shares)
return processed_share_objects


def reapply_shares(engine, dataset_uri):
def reapply_shares(engine, dataset_uris: List[str], share_object_uris: List[str]):
"""
A method used by the scheduled ECS Task to re-apply_share() on all data.all active shares
If dataset_uri is provided this ECS will reapply on all unhealthy shares belonging to a dataset
else it will reapply on all data.all active unhealthy shares.
"""
if dataset_uri:
return EcsBulkShareRepplyService.process_reapply_shares_for_dataset(engine, dataset_uri)
processed_shares = []
if len(dataset_uris) > 0:
processed_shares.append(EcsBulkShareRepplyService.process_reapply_shares_for_dataset(engine, dataset_uris))
if len(share_object_uris) > 0:
processed_shares.append(
EcsBulkShareRepplyService.process_reapply_shares_for_share_uris(engine, share_object_uris)
)
else:
return EcsBulkShareRepplyService.process_reapply_shares(engine)
processed_shares.append(EcsBulkShareRepplyService.process_reapply_shares(engine))

return processed_shares


if __name__ == '__main__':
load_modules(modes={ImportMode.SHARES_TASK})
ENVNAME = os.environ.get('envname', 'local')
ENGINE = get_engine(envname=ENVNAME)
dataset_uri = os.environ.get('datasetUri', '')
processed_shares = reapply_shares(engine=ENGINE, dataset_uri=dataset_uri)
dataset_uris: List[str] = os.environ.get('datasetUris', [])
share_object_uris: List[str] = os.environ.get('shareUris', [])
processed_shares = reapply_shares(engine=ENGINE, dataset_uris=dataset_uris, share_object_uris=share_object_uris)
log.info(f'Finished processing {len(processed_shares)} shares')
19 changes: 15 additions & 4 deletions backend/dataall/modules/shares_base/tasks/share_verifier_task.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import logging
import os
import sys
from typing import List

from dataall.modules.shares_base.db.share_object_repositories import ShareObjectRepository
from dataall.modules.shares_base.db.share_object_models import ShareObject
from dataall.modules.shares_base.services.shares_enums import ShareItemStatus
from dataall.modules.shares_base.services.shares_enums import ShareItemStatus, ShareItemHealthStatus
from dataall.modules.shares_base.services.sharing_service import SharingService
from dataall.core.stacks.aws.ecs import Ecs

Expand Down Expand Up @@ -35,11 +37,20 @@ def verify_shares(engine):
return processed_share_objects


def trigger_reapply_task():
def trigger_reapply_task(engine):
with engine.scoped_session() as session:
unhealthy_share_objects: List[ShareObject] = ShareObjectRepository.get_share_object_with_health_status(
session, ShareItemHealthStatus.Unhealthy.value
)
unhealthy_share_objects_uris: List[str] = [share_object.shareUri for share_object in unhealthy_share_objects]

context = {
{'name': 'shareUris', 'value': unhealthy_share_objects_uris},
}
Ecs.run_ecs_task(
task_definition_param='ecs/task_def_arn/share_reapplier',
container_name_param='ecs/container/share_reapplier',
context=[],
context=context,
)


Expand All @@ -49,4 +60,4 @@ def trigger_reapply_task():
ENGINE = get_engine(envname=ENVNAME)
processed_shares = verify_shares(engine=ENGINE)
log.info(f'Finished verifying {len(processed_shares)} shares, triggering reapply...')
trigger_reapply_task()
trigger_reapply_task(ENGINE)
Loading