Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions data_management/DluWatcher
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
FROM python:3.10-slim-bullseye

USER root

COPY requirements.txt ./

RUN pip3 install --progress-bar off --no-cache-dir -r requirements.txt
Expand Down
22 changes: 19 additions & 3 deletions data_management/services/dlu_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from zarr_checksum import compute_zarr_checksum
from zarr_checksum.generators import yield_files_local
from mmap import mmap, ACCESS_READ
import subprocess

logger = logging.getLogger("DLUFilesystem")
logger.setLevel(logging.INFO)
Expand Down Expand Up @@ -114,11 +115,26 @@ def chown_dir(self, package_id: str, files: list[DLUFile], user_id):
if os.stat(subdir_path).st_uid != user_id or os.stat(subdir_path).st_gid != int(os.environ['dlu_group']):
os.chown(subdir_path, user_id, int(os.environ['dlu_group']))

def rename_files(self, file_list: list[DLUFile], slide_name_map, package_id ):
def rename_and_move_files(self, file_list: list[DLUFile], slide_name_map, package_id ):
dluFiles = []
dest_package_directory = os.path.join(self.dlu_data_directory, self.dlu_package_dir_prefix + package_id)
if os.path.exists(dest_package_directory):
shutil.rmtree(dest_package_directory)
if not os.path.exists(dest_package_directory):
logger.info("Creating directory " + dest_package_directory)
os.makedirs(dest_package_directory, exist_ok=True)

source_package_directory = self.globus_data_directory + '/' + self.globus_dir_prefix + package_id
for file in file_list:
os.rename(os.path.join(source_package_directory, file.name),
os.path.join(source_package_directory, slide_name_map[file.name]))
dest_file = os.path.join(dest_package_directory, slide_name_map[file.name])
logger.info("Copying file " + os.path.join(source_package_directory, file.name) + " to "
+ os.path.join(dest_package_directory, slide_name_map[file.name]))
shutil.copy(os.path.join(source_package_directory, file.name),
dest_file)
file = DLUFile(name=slide_name_map[file.name], path=dest_package_directory,
checksum=calculate_checksum(dest_file), size=os.path.getsize(dest_file))
dluFiles.append(file)
return dluFiles

def copy_files(self, package_id: str, file_list: list[DLUFile], preserve_path: bool = False, no_src_package: bool = False):
files_copied = 0
Expand Down
8 changes: 6 additions & 2 deletions data_management/services/dlu_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def find_all_files(self):
)

def update_md5(self, file_id: str, checksum: str, package_id: str):
self.db.insert_data("UPDATE dlu_file SET dlu_md5checksum = %s WHERE dlu_file_id = %s and dlu_package_id = %s",
return self.db.insert_data("UPDATE dlu_file SET dlu_md5checksum = %s WHERE dlu_file_id = %s and dlu_package_id = %s",
(checksum, file_id,package_id))

def move_globus_files_to_dlu(self, package_id: str):
Expand Down Expand Up @@ -263,8 +263,12 @@ def get_slide_manifest_import_by_kit(self, kit_id, stain):
(kit_id,stain,))

def set_error_message_slide_scan_curation(self, error, image_id):
self.db.insert_data("UPDATE slide_scan_curation set error_message = %s where image_id = %s",
return self.db.insert_data("UPDATE slide_scan_curation set error_message = %s where image_id = %s",
(error, image_id,))

def set_error_message_slide_scan_curation_redcap_id(self, error, redcap_id):
return self.db.insert_data_no_alert("UPDATE slide_scan_curation set error_message = %s where redcap_id = %s",
(error, redcap_id,))

def find_slide_scan_info_by_package_id(self, package_id):
return self.db.get_data("SELECT * FROM slide_scan_v WHERE dlu_package_id = %s",
Expand Down
84 changes: 57 additions & 27 deletions data_management/watch_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ def pickup_waiting_packages(self):

def move_packages_to_DLU(self, packages):
file_list = None

for _, package in enumerate(packages):
skip_copy = False
package_id = package['dlu_package_id']
logger.info("Moving package " + package_id)

Expand All @@ -91,55 +93,65 @@ def move_packages_to_DLU(self, packages):
success = self.do_wsi_file_renames(globus_data_directory, package_id)
if not success:
continue
else:
skip_copy = True

# We do end up doing this check twice for WSIs but we are modifying the filenames, so it is probably good
directory_info = DirectoryInfo(globus_data_directory)
if not self.is_directory_valid(directory_info, package_id):
continue
if not skip_copy:
directory_info = DirectoryInfo(globus_data_directory)
if not self.is_directory_valid(directory_info, package_id):
continue

if directory_info.file_count == 0 and directory_info.subdir_count == 1:
contents = "".join(directory_info.dir_contents)
top_level_subdir = package_id + "/" + contents
file_list = self.dlu_file_handler.match_files(top_level_subdir)
else:
file_list = self.dlu_file_handler.match_files(package_id)

self.dlu_file_handler.copy_files(package_id, self.process_file_paths(directory_info.file_details))
self.dlu_file_handler.chown_dir(package_id, file_list, int(os.environ['dlu_user']))
file_info = self.dlu_management.insert_dlu_files(package_id, file_list)
self.dlu_management.update_dlu_package(package_id, { "globus_dlu_status": "success" })
self.dlu_management.update_dlu_package(package_id, { "ready_to_move_from_globus": "done" })
self.dlu_mongo.update_package_files(package_id, file_info)
self.dlu_state.set_package_state(package_id, PackageState.UPLOAD_SUCCEEDED)
self.dlu_state.clear_cache()
if directory_info.file_count == 0 and directory_info.subdir_count == 1:
contents = "".join(directory_info.dir_contents)
top_level_subdir = package_id + "/" + contents
file_list = self.dlu_file_handler.match_files(top_level_subdir)
else:
file_list = self.dlu_file_handler.match_files(package_id)

self.dlu_file_handler.copy_files(package_id, self.process_file_paths(directory_info.file_details))
self.dlu_file_handler.chown_dir(package_id, file_list, int(os.environ['dlu_user']))
file_info = self.dlu_management.insert_dlu_files(package_id, file_list)
self.dlu_management.update_dlu_package(package_id, { "globus_dlu_status": "success" })
self.dlu_management.update_dlu_package(package_id, { "ready_to_move_from_globus": "done" })
self.dlu_mongo.update_package_files(package_id, file_info)

self.dlu_state.set_package_state(package_id, PackageState.UPLOAD_SUCCEEDED)
self.dlu_state.clear_cache()

def fill_in_null_package_ids(self):
self.slide_management.fill_in_package_ids()

def do_wsi_file_renames(self, globus_data_directory: str, package_id: str):
logger.info("starting rename process")
error_msg = ""
slide_scan_info = self.dlu_management.find_slide_scan_info_by_package_id(package_id)
if slide_scan_info is None or len(slide_scan_info) == 0:
error_msg = "Error: Package not found in slide_scan_v"
self.log_err_message_slide_rename("Error: Package not found in slide_scan_v", package_id)
return False

missing_slides = self.dlu_management.is_package_missing_slides(package_id)
if missing_slides is not None and len(missing_slides) > 0:
error_msg = "Error: Package is missing slides"
self.log_err_message_slide_rename( "Error: Package is missing slides", package_id)
return False

slides_in_error = self.dlu_management.is_slides_in_error(package_id)
if slides_in_error is not None and len(slides_in_error) > 0:
error_msg = "Error: Package has some slides in error"
self.log_err_message_slide_rename("Error: Package has some slides in error", package_id)
return False

unapproved_files = self.dlu_management.find_not_approved_filenames(package_id)
if unapproved_files is not None and len(unapproved_files) > 0:
error_msg = "Error: Package has unapproved filenames"
self.log_err_message_slide_rename("Error: Package has unapproved filenames", package_id)
return False

directory_info = DirectoryInfo(globus_data_directory)
directory_info = DirectoryInfo(globus_data_directory, calculate_checksums=False)
if not self.is_directory_valid(directory_info, package_id):
# This method logs errors in it, so no need to continue, or capture error message
return False

if directory_info.file_count == 0 or directory_info.file_count != len(slide_scan_info):
error_msg = "Error: Globus file count does not match expectation"
self.log_err_message_slide_rename("Error: Globus file count does not match expectation", package_id)
return False

# No need to calc checksums here, we just need the list of files
file_list = self.dlu_file_handler.match_files(package_id, calculate_checksums=False)
Expand All @@ -157,15 +169,33 @@ def do_wsi_file_renames(self, globus_data_directory: str, package_id: str):
self.dlu_management.update_dlu_package(package_id, {"globus_dlu_status": error_msg})
return False

self.dlu_file_handler.rename_files(file_list, slide_name_map,package_id)
copied_files = self.dlu_file_handler.rename_and_move_files(file_list, slide_name_map, package_id)
if len(copied_files) == 0:
return False

self.dlu_file_handler.chown_dir(package_id, copied_files, int(os.environ['dlu_user']))
file_info = self.dlu_management.insert_dlu_files(package_id=package_id, file_list=copied_files)
self.dlu_management.update_dlu_package(package_id, {"globus_dlu_status": "success"})
self.dlu_management.update_dlu_package(package_id, {"ready_to_move_from_globus": "done"})
self.dlu_mongo.update_package_files(package_id, file_info)

self.dlu_state.set_package_state(package_id, PackageState.UPLOAD_SUCCEEDED)
self.dlu_state.clear_cache()

return True

def log_err_message_slide_rename(self, error_msg, package_id):
logger.error(error_msg + " for package: " + package_id)
self.dlu_management.update_dlu_package(package_id, {"globus_dlu_status": error_msg})

def is_directory_valid(self, directory_info, package_id):
logger.info("checking if directory is valid")
if directory_info.file_count == 0 and directory_info.subdir_count == 0:
error_msg = "Error: package " + package_id + " has no files or top level subdirectory"
logger.info(error_msg + " Skipping.")
self.dlu_management.update_dlu_package(package_id, {"globus_dlu_status": error_msg})
return False
return True


if __name__ == "__main__":
Expand Down