From d2d20789f88a82549b10ae2a6ffe93031b74bcce Mon Sep 17 00:00:00 2001 From: Becky Reamy Date: Thu, 4 Dec 2025 10:58:29 -0500 Subject: [PATCH] KPMP-5807: Bunch of fixes to get happy path working --- data_management/DluWatcher | 2 + data_management/services/dlu_filesystem.py | 22 +++++- data_management/services/dlu_management.py | 14 ++-- data_management/watch_files.py | 84 +++++++++++++++------- 4 files changed, 85 insertions(+), 37 deletions(-) diff --git a/data_management/DluWatcher b/data_management/DluWatcher index 4818e3f..c44ab9e 100644 --- a/data_management/DluWatcher +++ b/data_management/DluWatcher @@ -1,5 +1,7 @@ FROM python:3.10-slim-bullseye +USER root + COPY requirements.txt ./ RUN pip3 install --progress-bar off --no-cache-dir -r requirements.txt diff --git a/data_management/services/dlu_filesystem.py b/data_management/services/dlu_filesystem.py index a8a0ae7..f1b9d44 100644 --- a/data_management/services/dlu_filesystem.py +++ b/data_management/services/dlu_filesystem.py @@ -8,6 +8,7 @@ from zarr_checksum import compute_zarr_checksum from zarr_checksum.generators import yield_files_local from mmap import mmap, ACCESS_READ +import subprocess logger = logging.getLogger("DLUFilesystem") logger.setLevel(logging.INFO) @@ -114,11 +115,26 @@ def chown_dir(self, package_id: str, files: list[DLUFile], user_id): if os.stat(subdir_path).st_uid != user_id or os.stat(subdir_path).st_gid != int(os.environ['dlu_group']): os.chown(subdir_path, user_id, int(os.environ['dlu_group'])) - def rename_files(self, file_list: list[DLUFile], slide_name_map, package_id ): + def rename_and_move_files(self, file_list: list[DLUFile], slide_name_map, package_id ): + dluFiles = [] + dest_package_directory = os.path.join(self.dlu_data_directory, self.dlu_package_dir_prefix + package_id) + if os.path.exists(dest_package_directory): + shutil.rmtree(dest_package_directory) + if not os.path.exists(dest_package_directory): + logger.info("Creating directory " + dest_package_directory) + os.makedirs(dest_package_directory, exist_ok=True) + source_package_directory = self.globus_data_directory + '/' + self.globus_dir_prefix + package_id for file in file_list: - os.rename(os.path.join(source_package_directory, file.name), - os.path.join(source_package_directory, slide_name_map[file.name])) + dest_file = os.path.join(dest_package_directory, slide_name_map[file.name]) + logger.info("Copying file " + os.path.join(source_package_directory, file.name) + " to " + + os.path.join(dest_package_directory, slide_name_map[file.name])) + shutil.copy(os.path.join(source_package_directory, file.name), + dest_file) + file = DLUFile(name=slide_name_map[file.name], path=dest_package_directory, + checksum=calculate_checksum(dest_file), size=os.path.getsize(dest_file)) + dluFiles.append(file) + return dluFiles def copy_files(self, package_id: str, file_list: list[DLUFile], preserve_path: bool = False, no_src_package: bool = False): files_copied = 0 diff --git a/data_management/services/dlu_management.py b/data_management/services/dlu_management.py index 6ccf2e8..e5eab61 100644 --- a/data_management/services/dlu_management.py +++ b/data_management/services/dlu_management.py @@ -160,7 +160,7 @@ def find_all_files(self): ) def update_md5(self, file_id: str, checksum: str, package_id: str): - self.db.insert_data("UPDATE dlu_file SET dlu_md5checksum = %s WHERE dlu_file_id = %s and dlu_package_id = %s", + return self.db.insert_data("UPDATE dlu_file SET dlu_md5checksum = %s WHERE dlu_file_id = %s and dlu_package_id = %s", (checksum, file_id,package_id)) def move_globus_files_to_dlu(self, package_id: str): @@ -252,27 +252,27 @@ def get_slide_manifest_import_by_kit(self, kit_id, stain): (kit_id,stain,)) def set_error_message_slide_scan_curation(self, error, image_id): - self.db.insert_data("UPDATE slide_scan_curation set error_message = %s where image_id = %s", + return self.db.insert_data("UPDATE slide_scan_curation set error_message = %s where image_id = %s", (error, image_id,)) def set_error_message_slide_scan_curation_redcap_id(self, error, redcap_id): - self.db.insert_data_no_alert("UPDATE slide_scan_curation set error_message = %s where redcap_id = %s", + return self.db.insert_data_no_alert("UPDATE slide_scan_curation set error_message = %s where redcap_id = %s", (error, redcap_id,)) def find_slide_scan_info_by_package_id(self, package_id): - self.db.get_data("SELECT * FROM slide_scan_v WHERE dlu_package_id = %s", + return self.db.get_data("SELECT * FROM slide_scan_v WHERE dlu_package_id = %s", (package_id,)) def is_package_missing_slides(self, package_id): - self.db.get_data("SELECT * FROM slide_scan_v WHERE dlu_package_id = %s and missing_slides = 1", + return self.db.get_data("SELECT * FROM slide_scan_v WHERE dlu_package_id = %s and missing_slides = 1", (package_id,)) def is_slides_in_error(self, package_id): - self.db.get_data("SELECT * FROM slide_scan_curation WHERE dlu_package_id = %s and error_message IS NOT NULL", + return self.db.get_data("SELECT * FROM slide_scan_curation WHERE dlu_package_id = %s and error_message IS NOT NULL", (package_id,)) def find_not_approved_filenames(self, package_id): - self.db.get_data("SELECT * FROM slide_scan_curation WHERE approve_file_name = 'yes' AND dlu_package_id = %s", + return self.db.get_data("SELECT * FROM slide_scan_curation WHERE approve_file_name = 'yes' AND dlu_package_id = %s", (package_id,)) diff --git a/data_management/watch_files.py b/data_management/watch_files.py index df757ad..cae3fd3 100644 --- a/data_management/watch_files.py +++ b/data_management/watch_files.py @@ -75,7 +75,9 @@ def pickup_waiting_packages(self): def move_packages_to_DLU(self, packages): file_list = None + for _, package in enumerate(packages): + skip_copy = False package_id = package['dlu_package_id'] logger.info("Moving package " + package_id) @@ -91,52 +93,62 @@ def move_packages_to_DLU(self, packages): success = self.do_wsi_file_renames(globus_data_directory, package_id) if not success: continue + else: + skip_copy = True - # We do end up doing this check twice for WSIs but we are modifying the filenames, so it is probably good - directory_info = DirectoryInfo(globus_data_directory) - if not self.is_directory_valid(directory_info, package_id): - continue + if not skip_copy: + directory_info = DirectoryInfo(globus_data_directory) + if not self.is_directory_valid(directory_info, package_id): + continue - if directory_info.file_count == 0 and directory_info.subdir_count == 1: - contents = "".join(directory_info.dir_contents) - top_level_subdir = package_id + "/" + contents - file_list = self.dlu_file_handler.match_files(top_level_subdir) - else: - file_list = self.dlu_file_handler.match_files(package_id) - - self.dlu_file_handler.copy_files(package_id, self.process_file_paths(directory_info.file_details)) - self.dlu_file_handler.chown_dir(package_id, file_list, int(os.environ['dlu_user'])) - file_info = self.dlu_management.insert_dlu_files(package_id, file_list) - self.dlu_management.update_dlu_package(package_id, { "globus_dlu_status": "success" }) - self.dlu_management.update_dlu_package(package_id, { "ready_to_move_from_globus": "done" }) - self.dlu_mongo.update_package_files(package_id, file_info) - - self.dlu_state.set_package_state(package_id, PackageState.UPLOAD_SUCCEEDED) - self.dlu_state.clear_cache() + if directory_info.file_count == 0 and directory_info.subdir_count == 1: + contents = "".join(directory_info.dir_contents) + top_level_subdir = package_id + "/" + contents + file_list = self.dlu_file_handler.match_files(top_level_subdir) + else: + file_list = self.dlu_file_handler.match_files(package_id) + + self.dlu_file_handler.copy_files(package_id, self.process_file_paths(directory_info.file_details)) + self.dlu_file_handler.chown_dir(package_id, file_list, int(os.environ['dlu_user'])) + file_info = self.dlu_management.insert_dlu_files(package_id, file_list) + self.dlu_management.update_dlu_package(package_id, { "globus_dlu_status": "success" }) + self.dlu_management.update_dlu_package(package_id, { "ready_to_move_from_globus": "done" }) + self.dlu_mongo.update_package_files(package_id, file_info) + + self.dlu_state.set_package_state(package_id, PackageState.UPLOAD_SUCCEEDED) + self.dlu_state.clear_cache() def do_wsi_file_renames(self, globus_data_directory: str, package_id: str): + logger.info("starting rename process") error_msg = "" slide_scan_info = self.dlu_management.find_slide_scan_info_by_package_id(package_id) if slide_scan_info is None or len(slide_scan_info) == 0: - error_msg = "Error: Package not found in slide_scan_v" + self.log_err_message_slide_rename("Error: Package not found in slide_scan_v", package_id) + return False missing_slides = self.dlu_management.is_package_missing_slides(package_id) if missing_slides is not None and len(missing_slides) > 0: - error_msg = "Error: Package is missing slides" + self.log_err_message_slide_rename( "Error: Package is missing slides", package_id) + return False + slides_in_error = self.dlu_management.is_slides_in_error(package_id) if slides_in_error is not None and len(slides_in_error) > 0: - error_msg = "Error: Package has some slides in error" + self.log_err_message_slide_rename("Error: Package has some slides in error", package_id) + return False + unapproved_files = self.dlu_management.find_not_approved_filenames(package_id) if unapproved_files is not None and len(unapproved_files) > 0: - error_msg = "Error: Package has unapproved filenames" + self.log_err_message_slide_rename("Error: Package has unapproved filenames", package_id) + return False - directory_info = DirectoryInfo(globus_data_directory) + directory_info = DirectoryInfo(globus_data_directory, calculate_checksums=False) if not self.is_directory_valid(directory_info, package_id): # This method logs errors in it, so no need to continue, or capture error message return False if directory_info.file_count == 0 or directory_info.file_count != len(slide_scan_info): - error_msg = "Error: Globus file count does not match expectation" + self.log_err_message_slide_rename("Error: Globus file count does not match expectation", package_id) + return False # No need to calc checksums here, we just need the list of files file_list = self.dlu_file_handler.match_files(package_id, calculate_checksums=False) @@ -154,15 +166,33 @@ def do_wsi_file_renames(self, globus_data_directory: str, package_id: str): self.dlu_management.update_dlu_package(package_id, {"globus_dlu_status": error_msg}) return False - self.dlu_file_handler.rename_files(file_list, slide_name_map,package_id) + copied_files = self.dlu_file_handler.rename_and_move_files(file_list, slide_name_map, package_id) + if len(copied_files) == 0: + return False + + self.dlu_file_handler.chown_dir(package_id, copied_files, int(os.environ['dlu_user'])) + file_info = self.dlu_management.insert_dlu_files(package_id=package_id, file_list=copied_files) + self.dlu_management.update_dlu_package(package_id, {"globus_dlu_status": "success"}) + self.dlu_management.update_dlu_package(package_id, {"ready_to_move_from_globus": "done"}) + self.dlu_mongo.update_package_files(package_id, file_info) + + self.dlu_state.set_package_state(package_id, PackageState.UPLOAD_SUCCEEDED) + self.dlu_state.clear_cache() + return True + def log_err_message_slide_rename(self, error_msg, package_id): + logger.error(error_msg + " for package: " + package_id) + self.dlu_management.update_dlu_package(package_id, {"globus_dlu_status": error_msg}) + def is_directory_valid(self, directory_info, package_id): + logger.info("checking if directory is valid") if directory_info.file_count == 0 and directory_info.subdir_count == 0: error_msg = "Error: package " + package_id + " has no files or top level subdirectory" logger.info(error_msg + " Skipping.") self.dlu_management.update_dlu_package(package_id, {"globus_dlu_status": error_msg}) return False + return True if __name__ == "__main__":