diff --git a/.coverage b/.coverage index bd69e2a..e03130e 100644 Binary files a/.coverage and b/.coverage differ diff --git a/python/ouroboros/helpers/files.py b/python/ouroboros/helpers/files.py index 2ae0de5..de23456 100644 --- a/python/ouroboros/helpers/files.py +++ b/python/ouroboros/helpers/files.py @@ -122,27 +122,27 @@ def num_digits_for_n_files(n: int) -> int: return len(str(n - 1)) -def np_convert(target_dtype: np.dtype, source: ArrayLike, normalize=True, safe_bool=False): - """ TODO: Fix for Negative Values """ +def np_convert(target_dtype: np.dtype, source: ArrayLike, + preset_min: np.number = None, preset_max: np.number = None, + normalize: bool = True, zero_guard: bool = True, safe_bool: bool = False): if safe_bool and target_dtype == bool: return source.astype(target_dtype).astype(np.uint8) - elif np.issubdtype(target_dtype, np.integer) and normalize: - dtype_range = np.iinfo(target_dtype).max - np.iinfo(target_dtype).min - source_range = np.max(source) - np.min(source) - - # Avoid divide by 0, esp. as numpy segfaults when you do. - if source_range == 0.0: - source_range = 1.0 - return (source * max(int(dtype_range / source_range), 1)).astype(target_dtype) - elif np.issubdtype(target_dtype, np.floating) and normalize: - source_range = np.max(source) - np.min(source) + if normalize: + source_floor = (preset_min if preset_min is not None else np.min(source)) * -1 + source_range = (preset_max if preset_max is not None else np.max(source)) + source_floor # Avoid divide by 0, esp. as numpy segfaults when you do. if source_range == 0.0: source_range = 1.0 - return (source / source_range).astype(target_dtype) + if np.issubdtype(target_dtype, np.integer) and normalize: + dtype_range = np.iinfo(target_dtype).max - np.iinfo(target_dtype).min + return ((source + source_floor) * max(dtype_range / source_range, 1)).astype(target_dtype) + elif np.issubdtype(target_dtype, np.floating) and normalize: + return ((source + source_floor) / source_range).astype(target_dtype) + elif preset_min is not None and preset_min < 0 and zero_guard: + return (source - preset_min).astype(target_dtype) else: return source.astype(target_dtype) diff --git a/python/ouroboros/helpers/options.py b/python/ouroboros/helpers/options.py index 55a466c..fec2330 100644 --- a/python/ouroboros/helpers/options.py +++ b/python/ouroboros/helpers/options.py @@ -41,6 +41,8 @@ class SliceOptions(CommonOptions): False # Whether to connect the start and end of the given annotation points ) annotation_mip_level: int = 0 # MIP level for the annotation layer + normalize_output: bool = True + zeroguard_output: bool = True @field_serializer("bounding_box_params") def serialize_bounding_box_params(self, value: BoundingBoxParams): diff --git a/python/ouroboros/pipeline/slice_parallel_pipeline.py b/python/ouroboros/pipeline/slice_parallel_pipeline.py index b0427ec..be9ebb9 100644 --- a/python/ouroboros/pipeline/slice_parallel_pipeline.py +++ b/python/ouroboros/pipeline/slice_parallel_pipeline.py @@ -1,4 +1,5 @@ from functools import partial +from pathlib import Path import sys import threading import traceback @@ -14,6 +15,7 @@ format_tiff_name, join_path, num_digits_for_n_files, + np_convert ) from .pipeline import PipelineStep from ouroboros.helpers.mem import SharedNPArray @@ -62,6 +64,10 @@ def _process(self, input_data: tuple[any]) -> None | str: if not isinstance(slice_rects, np.ndarray): return "Input data must contain an array of slice rects." + # Make sure slice rects is not empty + if len(slice_rects) == 0: + return "No slice rects were provided." + # Create a folder with the same name as the output file folder_name = join_path( config.output_file_folder, @@ -74,52 +80,41 @@ def _process(self, input_data: tuple[any]) -> None | str: output_file_path = join_path( config.output_file_folder, format_slice_output_file(config.output_file_name) ) + temp_file_path = Path( + config.output_file_folder, format_slice_output_file(config.output_file_name) + ).with_suffix(".temptif") + + # Start setting up metadata + # Volume cache resolution is in voxel size, but .tiff XY resolution is in voxels per unit, so we invert. + resolution = [1.0 / voxel_size for voxel_size in volume_cache.get_resolution_um()[:2] * 0.0001] + resolutionunit = "CENTIMETER" + # However, Z Resolution doesn't have an inbuilt property or strong convention, so going with this. + metadata = { + "spacing": volume_cache.get_resolution_um()[2], + "unit": "um" + } + + # Determine the dimensions of the image + has_color_channels = volume_cache.has_color_channels() + num_color_channels = ( + volume_cache.get_num_channels() if has_color_channels else None + ) - # Create an empty tiff to store the slices - if config.make_single_file: - # Make sure slice rects is not empty - if len(slice_rects) == 0: - return "No slice rects were provided." - - try: - # Volume cache resolution is in voxel size, but .tiff XY resolution is in voxels per unit, so we invert. - resolution = [1.0 / voxel_size for voxel_size in volume_cache.get_resolution_um()[:2] * 0.0001] - resolutionunit = "CENTIMETER" - # However, Z Resolution doesn't have an inbuilt property or strong convention, so going with this. - metadata = { - "spacing": volume_cache.get_resolution_um()[2], - "unit": "um" - } - - # Determine the dimensions of the image - has_color_channels = volume_cache.has_color_channels() - num_color_channels = ( - volume_cache.get_num_channels() if has_color_channels else None - ) - - # Create a single tif file with the same dimensions as the slices - temp_shape = ( - slice_rects.shape[0], - config.slice_width, - config.slice_height, - ) + ((num_color_channels,) if has_color_channels else ()) - temp_data = np.zeros(temp_shape, dtype=np.float16) - - imwrite( - output_file_path, - temp_data, - software="ouroboros", - resolution=resolution[:2], # XY Resolution - resolutionunit=resolutionunit, - photometric=( - "rgb" - if has_color_channels and num_color_channels > 1 - else "minisblack" - ), - metadata=metadata - ) - except BaseException as e: - return f"Error creating single tif file: {e}" + tiff_metadata = { + "software": "ouroboros", + "resolution": resolution[:2] + [resolutionunit], # XY Resolution + "photometric": ("rgb" if has_color_channels and num_color_channels > 1 else "minisblack"), + "metadata": metadata + } + + # Create temporary memmap (single tif file with the same dimensions as the slices) + temp_shape = ( + slice_rects.shape[0], + config.slice_width, + config.slice_height, + ) + ((int(num_color_channels),) if has_color_channels else ()) + + temp_file = memmap(temp_file_path, shape=temp_shape, dtype=np.float32, **tiff_metadata) # Calculate the number of digits needed to store the number of slices num_digits = num_digits_for_n_files(len(slice_rects)) @@ -127,6 +122,11 @@ def _process(self, input_data: tuple[any]) -> None | str: # Processing completion marker. all_work_done = threading.Event() + # Minimum and maximum boundaries. +# bound_shm = SharedNPArray("boundaries", X(2), np.float64, allocate=True) +# with bound_shm[:] as boundaries: + boundaries = np.zeros(2, dtype=np.float32) + # Start the download volumes process and process downloaded volumes as they become available in the queue try: with concurrent.futures.ThreadPoolExecutor( @@ -145,11 +145,8 @@ def _process(self, input_data: tuple[any]) -> None | str: partial_slice_executor = partial( process_worker_save_parallel, config=config, - folder_name=folder_name, slice_rects=slice_rects, - num_threads=self.num_threads, - num_digits=num_digits, - single_output_path=output_file_path if config.make_single_file else None, + temporary_path=temp_file_path, shared=volume_cache.use_shared) partial_dl_executor = partial(dl_worker, @@ -162,7 +159,10 @@ def dl_completed(future): process_futures[-1].add_done_callback(processor_completed) def processor_completed(future): - volume_index, durations = future.result() + volume_index, durations, min_val, max_val = future.result() +# with bound_shm[:] as boundaries: + boundaries[0] = min(boundaries[0], min_val) + boundaries[1] = max(boundaries[1], max_val) if volume_cache.use_shared: volume_cache.remove_volume(volume_index, destroy_shared=True) for key, value in durations.items(): @@ -187,6 +187,27 @@ def processor_completed(future): all_work_done.wait() + with multiprocessing.pool.ThreadPool(self.num_processes) as pool: + # with bound_shm[:] as boundaries: + convert_func = partial(np_convert, + target_dtype=volume_cache.get_volume_dtype(), + normalize=config.normalize_output, + zero_guard=config.zeroguard_output, + preset_min=boundaries[0], + preset_max=boundaries[1]) + if config.make_single_file: + target_file = memmap(output_file_path, shape=temp_shape, dtype=volume_cache.get_volume_dtype(), + **tiff_metadata) + pool.starmap(memmap_normalized, [(temp_file, target_file, convert_func, i) + for i in range(len(temp_file))]) + else: + pool.starmap(write_normalized, + [(temp_file, + partial(imwrite, join_path(folder_name, format_tiff_name(i, num_digits)), **tiff_metadata), # noqa: E501 + convert_func, + i) for i in range(len(temp_file))]) + del temp_file + temp_file_path.unlink() except BaseException as e: traceback.print_tb(e.__traceback__, file=sys.stderr) return f"Error downloading data: {e}" @@ -208,12 +229,9 @@ def dl_worker(volume_cache: VolumeCache, volume: int, parallel_fetch: bool = Fal def process_worker_save_parallel( config: SliceOptions, - folder_name: str, processing_data: tuple[np.ndarray | SharedNPArray, np.ndarray, np.ndarray, int], slice_rects: np.ndarray, - num_threads: int, - num_digits: int, - single_output_path: str = None, + temporary_path: str = None, shared: bool = False ) -> tuple[int, dict[str, list[float]]]: volume, bounding_box, slice_indices, volume_index = processing_data @@ -245,32 +263,20 @@ def process_worker_save_parallel( ) durations["slice_volume"].append(time.perf_counter() - start) - if single_output_path is None: - # Using a ThreadPoolExecutor within the process for saving slices - with concurrent.futures.ThreadPoolExecutor( - max_workers=num_threads - ) as thread_executor: - futures = [] - - for i, slice_i in zip(slice_indices, slices): - start = time.perf_counter() - filename = join_path(folder_name, format_tiff_name(i, num_digits)) - futures.append(thread_executor.submit(save_thread, filename, slice_i)) - durations["save"].append(time.perf_counter() - start) - - for future in concurrent.futures.as_completed(futures): - future.result() - else: - # Save the slices to a previously created tiff file - mmap = memmap(single_output_path) - mmap[slice_indices] = slices - mmap.flush() - del mmap + # Save the slices to a previously created tiff file + mmap = memmap(temporary_path) + mmap[slice_indices] = slices + mmap.flush() + del mmap durations["total_process"].append(time.perf_counter() - start_total) - return volume_index, durations + return volume_index, durations, np.min(slices), np.max(slices) + + +def memmap_normalized(source: np.memmap, target: np.memmap, convert: callable, index: int): + target[index] = convert(source=source[index, :]) -def save_thread(filename: str, data: np.ndarray): - imwrite(filename, data, software="ouroboros") +def write_normalized(source: np.memmap, writer: callable, convert: callable, index: int): + writer(data=convert(source=source[index])) diff --git a/src/renderer/src/interfaces/options.tsx b/src/renderer/src/interfaces/options.tsx index 4d17958..2d44df6 100644 --- a/src/renderer/src/interfaces/options.tsx +++ b/src/renderer/src/interfaces/options.tsx @@ -241,6 +241,12 @@ export class SliceOptionsFile extends CompoundEntry { 'Whether to output one tiff stack file or a folder of files.' ), new Entry('connect_start_and_end', 'Connect Endpoints', false, 'boolean').withHidden(), + new Entry('normalize_output', 'Normalize Output', true, 'boolean').withDescription( + 'Whether to normalize the output data.' + ), + new Entry('zeroguard_output', 'Zero-Guard Output', true, 'boolean').withDescription( + 'Whether to fix sub-zero slice values that may cause overflow artifacts.' + ), new Entry('flush_cache', 'Flush CloudVolume Cache', false, 'boolean').withHidden(), new CompoundEntry('bounding_box_params', 'Bounding Box Parameters', [ new Entry('max_depth', 'Max Depth', 12, 'number').withDescription(