Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified .coverage
Binary file not shown.
26 changes: 13 additions & 13 deletions python/ouroboros/helpers/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,27 +122,27 @@ def num_digits_for_n_files(n: int) -> int:
return len(str(n - 1))


def np_convert(target_dtype: np.dtype, source: ArrayLike, normalize=True, safe_bool=False):
""" TODO: Fix for Negative Values """
def np_convert(target_dtype: np.dtype, source: ArrayLike,
preset_min: np.number = None, preset_max: np.number = None,
normalize: bool = True, zero_guard: bool = True, safe_bool: bool = False):
if safe_bool and target_dtype == bool:
return source.astype(target_dtype).astype(np.uint8)
elif np.issubdtype(target_dtype, np.integer) and normalize:
dtype_range = np.iinfo(target_dtype).max - np.iinfo(target_dtype).min
source_range = np.max(source) - np.min(source)

# Avoid divide by 0, esp. as numpy segfaults when you do.
if source_range == 0.0:
source_range = 1.0

return (source * max(int(dtype_range / source_range), 1)).astype(target_dtype)
elif np.issubdtype(target_dtype, np.floating) and normalize:
source_range = np.max(source) - np.min(source)
if normalize:
source_floor = (preset_min if preset_min is not None else np.min(source)) * -1
source_range = (preset_max if preset_max is not None else np.max(source)) + source_floor

# Avoid divide by 0, esp. as numpy segfaults when you do.
if source_range == 0.0:
source_range = 1.0

return (source / source_range).astype(target_dtype)
if np.issubdtype(target_dtype, np.integer) and normalize:
dtype_range = np.iinfo(target_dtype).max - np.iinfo(target_dtype).min
return ((source + source_floor) * max(dtype_range / source_range, 1)).astype(target_dtype)
elif np.issubdtype(target_dtype, np.floating) and normalize:
return ((source + source_floor) / source_range).astype(target_dtype)
elif preset_min is not None and preset_min < 0 and zero_guard:
return (source - preset_min).astype(target_dtype)
else:
return source.astype(target_dtype)

Expand Down
2 changes: 2 additions & 0 deletions python/ouroboros/helpers/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class SliceOptions(CommonOptions):
False # Whether to connect the start and end of the given annotation points
)
annotation_mip_level: int = 0 # MIP level for the annotation layer
normalize_output: bool = True
zeroguard_output: bool = True

@field_serializer("bounding_box_params")
def serialize_bounding_box_params(self, value: BoundingBoxParams):
Expand Down
162 changes: 84 additions & 78 deletions python/ouroboros/pipeline/slice_parallel_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from functools import partial
from pathlib import Path
import sys
import threading
import traceback
Expand All @@ -14,6 +15,7 @@
format_tiff_name,
join_path,
num_digits_for_n_files,
np_convert
)
from .pipeline import PipelineStep
from ouroboros.helpers.mem import SharedNPArray
Expand Down Expand Up @@ -62,6 +64,10 @@ def _process(self, input_data: tuple[any]) -> None | str:
if not isinstance(slice_rects, np.ndarray):
return "Input data must contain an array of slice rects."

# Make sure slice rects is not empty
if len(slice_rects) == 0:
return "No slice rects were provided."

# Create a folder with the same name as the output file
folder_name = join_path(
config.output_file_folder,
Expand All @@ -74,59 +80,53 @@ def _process(self, input_data: tuple[any]) -> None | str:
output_file_path = join_path(
config.output_file_folder, format_slice_output_file(config.output_file_name)
)
temp_file_path = Path(
config.output_file_folder, format_slice_output_file(config.output_file_name)
).with_suffix(".temptif")

# Start setting up metadata
# Volume cache resolution is in voxel size, but .tiff XY resolution is in voxels per unit, so we invert.
resolution = [1.0 / voxel_size for voxel_size in volume_cache.get_resolution_um()[:2] * 0.0001]
resolutionunit = "CENTIMETER"
# However, Z Resolution doesn't have an inbuilt property or strong convention, so going with this.
metadata = {
"spacing": volume_cache.get_resolution_um()[2],
"unit": "um"
}

# Determine the dimensions of the image
has_color_channels = volume_cache.has_color_channels()
num_color_channels = (
volume_cache.get_num_channels() if has_color_channels else None
)

# Create an empty tiff to store the slices
if config.make_single_file:
# Make sure slice rects is not empty
if len(slice_rects) == 0:
return "No slice rects were provided."

try:
# Volume cache resolution is in voxel size, but .tiff XY resolution is in voxels per unit, so we invert.
resolution = [1.0 / voxel_size for voxel_size in volume_cache.get_resolution_um()[:2] * 0.0001]
resolutionunit = "CENTIMETER"
# However, Z Resolution doesn't have an inbuilt property or strong convention, so going with this.
metadata = {
"spacing": volume_cache.get_resolution_um()[2],
"unit": "um"
}

# Determine the dimensions of the image
has_color_channels = volume_cache.has_color_channels()
num_color_channels = (
volume_cache.get_num_channels() if has_color_channels else None
)

# Create a single tif file with the same dimensions as the slices
temp_shape = (
slice_rects.shape[0],
config.slice_width,
config.slice_height,
) + ((num_color_channels,) if has_color_channels else ())
temp_data = np.zeros(temp_shape, dtype=np.float16)

imwrite(
output_file_path,
temp_data,
software="ouroboros",
resolution=resolution[:2], # XY Resolution
resolutionunit=resolutionunit,
photometric=(
"rgb"
if has_color_channels and num_color_channels > 1
else "minisblack"
),
metadata=metadata
)
except BaseException as e:
return f"Error creating single tif file: {e}"
tiff_metadata = {
"software": "ouroboros",
"resolution": resolution[:2] + [resolutionunit], # XY Resolution
"photometric": ("rgb" if has_color_channels and num_color_channels > 1 else "minisblack"),
"metadata": metadata
}

# Create temporary memmap (single tif file with the same dimensions as the slices)
temp_shape = (
slice_rects.shape[0],
config.slice_width,
config.slice_height,
) + ((int(num_color_channels),) if has_color_channels else ())

temp_file = memmap(temp_file_path, shape=temp_shape, dtype=np.float32, **tiff_metadata)

# Calculate the number of digits needed to store the number of slices
num_digits = num_digits_for_n_files(len(slice_rects))

# Processing completion marker.
all_work_done = threading.Event()

# Minimum and maximum boundaries.
# bound_shm = SharedNPArray("boundaries", X(2), np.float64, allocate=True)
# with bound_shm[:] as boundaries:
boundaries = np.zeros(2, dtype=np.float32)

# Start the download volumes process and process downloaded volumes as they become available in the queue
try:
with concurrent.futures.ThreadPoolExecutor(
Expand All @@ -145,11 +145,8 @@ def _process(self, input_data: tuple[any]) -> None | str:
partial_slice_executor = partial(
process_worker_save_parallel,
config=config,
folder_name=folder_name,
slice_rects=slice_rects,
num_threads=self.num_threads,
num_digits=num_digits,
single_output_path=output_file_path if config.make_single_file else None,
temporary_path=temp_file_path,
shared=volume_cache.use_shared)

partial_dl_executor = partial(dl_worker,
Expand All @@ -162,7 +159,10 @@ def dl_completed(future):
process_futures[-1].add_done_callback(processor_completed)

def processor_completed(future):
volume_index, durations = future.result()
volume_index, durations, min_val, max_val = future.result()
# with bound_shm[:] as boundaries:
boundaries[0] = min(boundaries[0], min_val)
boundaries[1] = max(boundaries[1], max_val)
if volume_cache.use_shared:
volume_cache.remove_volume(volume_index, destroy_shared=True)
for key, value in durations.items():
Expand All @@ -187,6 +187,27 @@ def processor_completed(future):

all_work_done.wait()

with multiprocessing.pool.ThreadPool(self.num_processes) as pool:
# with bound_shm[:] as boundaries:
convert_func = partial(np_convert,
target_dtype=volume_cache.get_volume_dtype(),
normalize=config.normalize_output,
zero_guard=config.zeroguard_output,
preset_min=boundaries[0],
preset_max=boundaries[1])
if config.make_single_file:
target_file = memmap(output_file_path, shape=temp_shape, dtype=volume_cache.get_volume_dtype(),
**tiff_metadata)
pool.starmap(memmap_normalized, [(temp_file, target_file, convert_func, i)
for i in range(len(temp_file))])
else:
pool.starmap(write_normalized,
[(temp_file,
partial(imwrite, join_path(folder_name, format_tiff_name(i, num_digits)), **tiff_metadata), # noqa: E501
convert_func,
i) for i in range(len(temp_file))])
del temp_file
temp_file_path.unlink()
except BaseException as e:
traceback.print_tb(e.__traceback__, file=sys.stderr)
return f"Error downloading data: {e}"
Expand All @@ -208,12 +229,9 @@ def dl_worker(volume_cache: VolumeCache, volume: int, parallel_fetch: bool = Fal

def process_worker_save_parallel(
config: SliceOptions,
folder_name: str,
processing_data: tuple[np.ndarray | SharedNPArray, np.ndarray, np.ndarray, int],
slice_rects: np.ndarray,
num_threads: int,
num_digits: int,
single_output_path: str = None,
temporary_path: str = None,
shared: bool = False
) -> tuple[int, dict[str, list[float]]]:
volume, bounding_box, slice_indices, volume_index = processing_data
Expand Down Expand Up @@ -245,32 +263,20 @@ def process_worker_save_parallel(
)
durations["slice_volume"].append(time.perf_counter() - start)

if single_output_path is None:
# Using a ThreadPoolExecutor within the process for saving slices
with concurrent.futures.ThreadPoolExecutor(
max_workers=num_threads
) as thread_executor:
futures = []

for i, slice_i in zip(slice_indices, slices):
start = time.perf_counter()
filename = join_path(folder_name, format_tiff_name(i, num_digits))
futures.append(thread_executor.submit(save_thread, filename, slice_i))
durations["save"].append(time.perf_counter() - start)

for future in concurrent.futures.as_completed(futures):
future.result()
else:
# Save the slices to a previously created tiff file
mmap = memmap(single_output_path)
mmap[slice_indices] = slices
mmap.flush()
del mmap
# Save the slices to a previously created tiff file
mmap = memmap(temporary_path)
mmap[slice_indices] = slices
mmap.flush()
del mmap

durations["total_process"].append(time.perf_counter() - start_total)

return volume_index, durations
return volume_index, durations, np.min(slices), np.max(slices)


def memmap_normalized(source: np.memmap, target: np.memmap, convert: callable, index: int):
target[index] = convert(source=source[index, :])


def save_thread(filename: str, data: np.ndarray):
imwrite(filename, data, software="ouroboros")
def write_normalized(source: np.memmap, writer: callable, convert: callable, index: int):
writer(data=convert(source=source[index]))
6 changes: 6 additions & 0 deletions src/renderer/src/interfaces/options.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ export class SliceOptionsFile extends CompoundEntry {
'Whether to output one tiff stack file or a folder of files.'
),
new Entry('connect_start_and_end', 'Connect Endpoints', false, 'boolean').withHidden(),
new Entry('normalize_output', 'Normalize Output', true, 'boolean').withDescription(
'Whether to normalize the output data.'
),
new Entry('zeroguard_output', 'Zero-Guard Output', true, 'boolean').withDescription(
'Whether to fix sub-zero slice values that may cause overflow artifacts.'
),
new Entry('flush_cache', 'Flush CloudVolume Cache', false, 'boolean').withHidden(),
new CompoundEntry('bounding_box_params', 'Bounding Box Parameters', [
new Entry('max_depth', 'Max Depth', 12, 'number').withDescription(
Expand Down