From 6a0cb0a8672e48a33e84835cfe0f4c98147ad1f4 Mon Sep 17 00:00:00 2001 From: justincdavis Date: Fri, 27 Feb 2026 19:21:08 -0600 Subject: [PATCH] feat: add tests/image/ test submodule Co-Authored-By: Claude Opus 4.6 --- tests/image/conftest.py | 229 ++++++++++++ tests/image/kernels/conftest.py | 16 + tests/image/kernels/test_letterbox.py | 143 ++++++++ tests/image/kernels/test_linear.py | 124 +++++++ tests/image/kernels/test_performance.py | 128 +++++++ tests/image/kernels/test_sst.py | 260 +++++++++++++ tests/image/onnx/test_preproc_engine.py | 210 +++++++++++ tests/image/test_classifier.py | 96 +++++ tests/image/test_depth_estimator.py | 71 ++++ tests/image/test_detector.py | 110 ++++++ tests/image/test_image_model.py | 113 ++++++ tests/image/test_postproc.py | 469 ++++++++++++++++++++++++ tests/image/test_preproc.py | 363 ++++++++++++++++++ tests/image/test_sahi.py | 84 +++++ 14 files changed, 2416 insertions(+) create mode 100644 tests/image/conftest.py create mode 100644 tests/image/kernels/conftest.py create mode 100644 tests/image/kernels/test_letterbox.py create mode 100644 tests/image/kernels/test_linear.py create mode 100644 tests/image/kernels/test_performance.py create mode 100644 tests/image/kernels/test_sst.py create mode 100644 tests/image/onnx/test_preproc_engine.py create mode 100644 tests/image/test_classifier.py create mode 100644 tests/image/test_depth_estimator.py create mode 100644 tests/image/test_detector.py create mode 100644 tests/image/test_image_model.py create mode 100644 tests/image/test_postproc.py create mode 100644 tests/image/test_preproc.py create mode 100644 tests/image/test_sahi.py diff --git a/tests/image/conftest.py b/tests/image/conftest.py new file mode 100644 index 00000000..fbca91d7 --- /dev/null +++ b/tests/image/conftest.py @@ -0,0 +1,229 @@ +# Copyright (c) 2025-2026 Justin Davis (davisjustin302@gmail.com) +# +# MIT License +# mypy: disable-error-code="misc,no-any-return" +from __future__ import annotations + +import tempfile +from typing import Callable + +import numpy as np +import pytest + +from trtutils.image.preprocessors import ( + CPUPreprocessor, + CUDAPreprocessor, + TRTPreprocessor, +) + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- +PREPROC_SIZE = (640, 640) +PREPROC_RANGE = (0.0, 1.0) +PREPROC_DTYPE = np.dtype(np.float32) +IMAGENET_MEAN = (0.485, 0.456, 0.406) +IMAGENET_STD = (0.229, 0.224, 0.225) + +# Tolerance for CPU/GPU parity +CUDA_MAG_BOUNDS = 0.01 + + +# --------------------------------------------------------------------------- +# Build support detection +# --------------------------------------------------------------------------- +@pytest.fixture(scope="session") +def _trt_build_supported() -> bool: + """Check if TRT can build engines on this hardware (session-cached).""" + try: + from pathlib import Path + + from trtutils.builder._build import build_engine + + onnx_path = Path(__file__).parent.parent.parent / "data" / "simple.onnx" + if not onnx_path.exists(): + return False + with tempfile.NamedTemporaryFile(suffix=".engine", delete=True) as f: + build_engine(onnx_path, f.name, optimization_level=1) + return True + except RuntimeError: + return False + except Exception: + return False + + +# --------------------------------------------------------------------------- +# Parametrized fixtures +# --------------------------------------------------------------------------- +@pytest.fixture(params=["cpu", "cuda", "trt"]) +def preprocessor_type(request: pytest.FixtureRequest) -> str: + """Provide preprocessor type identifiers.""" + return request.param + + +@pytest.fixture(params=["linear", "letterbox"]) +def resize_method(request: pytest.FixtureRequest) -> str: + """Provide resize method identifiers.""" + return request.param + + +# --------------------------------------------------------------------------- +# Preprocessor factory +# --------------------------------------------------------------------------- +@pytest.fixture +def make_preprocessor( + _trt_build_supported: bool, +) -> Callable[..., CPUPreprocessor | CUDAPreprocessor | TRTPreprocessor]: + """Return a factory that builds preprocessors by type.""" + + def _make( + ptype: str, + *, + mean: tuple[float, float, float] | None = None, + std: tuple[float, float, float] | None = None, + batch_size: int = 4, + ) -> CPUPreprocessor | CUDAPreprocessor | TRTPreprocessor: + if ptype == "cpu": + return CPUPreprocessor(PREPROC_SIZE, PREPROC_RANGE, PREPROC_DTYPE, mean=mean, std=std) + if ptype == "cuda": + return CUDAPreprocessor(PREPROC_SIZE, PREPROC_RANGE, PREPROC_DTYPE, mean=mean, std=std) + if ptype == "trt": + if not _trt_build_supported: + pytest.skip("TRT cannot build engines for this GPU") + return TRTPreprocessor( + PREPROC_SIZE, PREPROC_RANGE, PREPROC_DTYPE, mean=mean, std=std, batch_size=batch_size + ) + err_msg = f"Unknown preprocessor type: {ptype}" + raise ValueError(err_msg) + + return _make + + +# --------------------------------------------------------------------------- +# Output mock generators +# --------------------------------------------------------------------------- +@pytest.fixture +def make_yolov10_output() -> Callable[[int, int], list[np.ndarray]]: + """Return a factory for YOLOv10-like outputs.""" + + def _make(batch_size: int, num_dets: int = 10) -> list[np.ndarray]: + output = np.zeros((batch_size, 300, 6), dtype=np.float32) + for b in range(batch_size): + for i in range(num_dets): + offset = b * 50 + output[b, i] = [ + 100 + i * 10 + offset, + 100 + i * 10 + offset, + 200 + i * 10 + offset, + 200 + i * 10 + offset, + 0.9 - i * 0.05, + i % 10, + ] + return [output] + + return _make + + +@pytest.fixture +def make_efficient_nms_output() -> Callable[[int, int], list[np.ndarray]]: + """Return a factory for EfficientNMS-like outputs.""" + + def _make(batch_size: int, num_dets: int = 10) -> list[np.ndarray]: + max_dets = 100 + num_dets_arr = np.full((batch_size,), num_dets, dtype=np.int32) + bboxes = np.zeros((batch_size, max_dets, 4), dtype=np.float32) + scores = np.zeros((batch_size, max_dets), dtype=np.float32) + class_ids = np.zeros((batch_size, max_dets), dtype=np.float32) + for b in range(batch_size): + offset = b * 50 + for i in range(num_dets): + bboxes[b, i] = [ + 100 + i * 10 + offset, + 100 + i * 10 + offset, + 200 + i * 10 + offset, + 200 + i * 10 + offset, + ] + scores[b, i] = 0.9 - i * 0.05 + class_ids[b, i] = i % 10 + return [num_dets_arr, bboxes, scores, class_ids] + + return _make + + +@pytest.fixture +def make_rfdetr_output() -> Callable[[int, int, int, int], list[np.ndarray]]: + """Return a factory for RF-DETR-like outputs.""" + + def _make( + batch_size: int, num_queries: int = 300, num_classes: int = 80, num_dets: int = 10 + ) -> list[np.ndarray]: + dets = np.zeros((batch_size, num_queries, 4), dtype=np.float32) + labels = np.full((batch_size, num_queries, num_classes), -10.0, dtype=np.float32) + for b in range(batch_size): + for i in range(num_dets): + cx = (150 + i * 10 + b * 30) / 640.0 + cy = (150 + i * 10 + b * 30) / 640.0 + w = 100 / 640.0 + h = 100 / 640.0 + dets[b, i] = [cx, cy, w, h] + class_idx = i % num_classes + labels[b, i, class_idx] = 5.0 - i * 0.3 + return [dets, labels] + + return _make + + +@pytest.fixture +def make_detr_output() -> Callable[[int, int, int], list[np.ndarray]]: + """Return a factory for DETR-like outputs.""" + + def _make(batch_size: int, num_queries: int = 300, num_dets: int = 10) -> list[np.ndarray]: + scores = np.zeros((batch_size, num_queries), dtype=np.float32) + labels = np.zeros((batch_size, num_queries), dtype=np.float32) + boxes = np.zeros((batch_size, num_queries, 4), dtype=np.float32) + for b in range(batch_size): + offset = b * 50 + for i in range(num_dets): + scores[b, i] = 0.9 - i * 0.05 + labels[b, i] = i % 10 + boxes[b, i] = [ + 100 + i * 10 + offset, + 100 + i * 10 + offset, + 200 + i * 10 + offset, + 200 + i * 10 + offset, + ] + return [scores, labels, boxes] + + return _make + + +@pytest.fixture +def make_classification_output() -> Callable[[int, int], list[np.ndarray]]: + """Return a factory for classification outputs.""" + rng = np.random.default_rng() + + def _make(batch_size: int, num_classes: int = 1000) -> list[np.ndarray]: + output = rng.standard_normal((batch_size, num_classes)).astype(np.float32) + for b in range(batch_size): + output[b, b % num_classes] = 10.0 + output[b, (b + 1) % num_classes] = 8.0 + return [output] + + return _make + + +# --------------------------------------------------------------------------- +# Ratios/padding factory +# --------------------------------------------------------------------------- +@pytest.fixture +def make_ratios_padding() -> Callable[ + [int], tuple[list[tuple[float, float]], list[tuple[float, float]]] +]: + """Return a factory for ratios and padding lists.""" + + def _make(batch_size: int) -> tuple[list[tuple[float, float]], list[tuple[float, float]]]: + ratios = [(1.0, 1.0) for _ in range(batch_size)] + padding = [(0.0, 0.0) for _ in range(batch_size)] + return ratios, padding + + return _make diff --git a/tests/image/kernels/conftest.py b/tests/image/kernels/conftest.py new file mode 100644 index 00000000..e2bf9ad3 --- /dev/null +++ b/tests/image/kernels/conftest.py @@ -0,0 +1,16 @@ +# Copyright (c) 2025-2026 Justin Davis (davisjustin302@gmail.com) +# +# MIT License +from __future__ import annotations + +import pytest + + +@pytest.fixture +def cuda_stream(): + """Create and destroy a CUDA stream for kernel tests.""" + from trtutils.core import create_stream, destroy_stream + + stream = create_stream() + yield stream + destroy_stream(stream) diff --git a/tests/image/kernels/test_letterbox.py b/tests/image/kernels/test_letterbox.py new file mode 100644 index 00000000..77fbdea9 --- /dev/null +++ b/tests/image/kernels/test_letterbox.py @@ -0,0 +1,143 @@ +# Copyright (c) 2026 Justin Davis (davisjustin302@gmail.com) +# +# MIT License +# mypy: disable-error-code="misc" +"""Tests for the letterbox resize CUDA kernel.""" + +from __future__ import annotations + +import math +from pathlib import Path + +import cv2 +import numpy as np +import pytest + +from trtutils.core import ( + Kernel, + create_binding, + create_stream, + destroy_stream, + memcpy_device_to_host_async, + memcpy_host_to_device_async, + stream_synchronize, +) +from trtutils.image import kernels + +try: + from cv2ext.image import letterbox as cv2ext_letterbox # type: ignore[import-untyped] + + _CV2EXT_AVAILABLE = True +except ImportError: + cv2ext_letterbox = None # type: ignore[assignment] + _CV2EXT_AVAILABLE = False + +_DATA_DIR = Path(__file__).parent.parent.parent.parent / "data" +_HORSE_IMAGE_PATH = _DATA_DIR / "horse.jpg" + + +def _run_letterbox_kernel( + img: np.ndarray, + output_shape: tuple[int, int], +) -> np.ndarray: + """Run the letterbox kernel and return result.""" + o_width, o_height = output_shape + height, width = img.shape[:2] + + stream = create_stream() + + num_threads: tuple[int, int, int] = (32, 32, 1) + num_blocks: tuple[int, int, int] = ( + math.ceil(o_width / num_threads[1]), + math.ceil(o_height / num_threads[0]), + 1, + ) + + input_binding = create_binding(img, is_input=True) + dummy_output = np.zeros((o_height, o_width, 3), dtype=np.uint8) + output_binding = create_binding(dummy_output, pagelocked_mem=True) + + scale_x = o_width / width + scale_y = o_height / height + scale = min(scale_x, scale_y) + new_width = int(width * scale) + new_height = int(height * scale) + pad_x = int((o_width - new_width) / 2) + pad_y = int((o_height - new_height) / 2) + + kernel = Kernel(kernels.LETTERBOX_RESIZE[0], kernels.LETTERBOX_RESIZE[1]) + args = kernel.create_args( + input_binding.allocation, + output_binding.allocation, + width, + height, + o_width, + o_height, + pad_x, + pad_y, + new_width, + new_height, + ) + + memcpy_host_to_device_async(input_binding.allocation, img, stream) + kernel.call(num_blocks, num_threads, stream, args) + memcpy_device_to_host_async(output_binding.host_allocation, output_binding.allocation, stream) + stream_synchronize(stream) + + result = output_binding.host_allocation.copy() + + destroy_stream(stream) + input_binding.free() + output_binding.free() + kernel.free() + + return result + + +class TestLetterboxKernel: + """Tests for the letterbox CUDA kernel.""" + + def test_compiles(self) -> None: + """Letterbox kernel compiles without error.""" + stream = create_stream() + compiled = Kernel(kernels.LETTERBOX_RESIZE[0], kernels.LETTERBOX_RESIZE[1]) + assert compiled is not None + destroy_stream(stream) + + @pytest.mark.skipif(not _CV2EXT_AVAILABLE, reason="cv2ext not installed") + def test_correctness_against_cv2ext(self) -> None: + """GPU letterbox result matches cv2ext.letterbox().""" + if not _HORSE_IMAGE_PATH.exists(): + pytest.skip("Horse test image not found") + img = cv2.imread(str(_HORSE_IMAGE_PATH)) + if img is None: + pytest.skip("Failed to read test image") + + output_shape = (640, 480) + assert cv2ext_letterbox is not None + resized_img, _, _ = cv2ext_letterbox(img, output_shape) # type: ignore[misc] + cuda_result = _run_letterbox_kernel(img, output_shape) + + assert cuda_result.shape == resized_img.shape + cpu_mean = np.mean(resized_img) + assert cpu_mean - 0.5 <= np.mean(cuda_result) <= cpu_mean + 0.5 + diff_mask = np.any(resized_img != cuda_result, axis=-1) + avg_diff = np.mean(np.abs(resized_img[diff_mask] - cuda_result[diff_mask])) + assert avg_diff < 1.0 + + @pytest.mark.parametrize( + "output_shape", + [(640, 640), (416, 416), (320, 320)], + ids=["640x640", "416x416", "320x320"], + ) + def test_various_target_sizes(self, output_shape: tuple[int, int]) -> None: + """Letterbox kernel works with various target sizes.""" + if not _HORSE_IMAGE_PATH.exists(): + pytest.skip("Horse test image not found") + img = cv2.imread(str(_HORSE_IMAGE_PATH)) + if img is None: + pytest.skip("Failed to read test image") + + o_width, o_height = output_shape + result = _run_letterbox_kernel(img, output_shape) + assert result.shape == (o_height, o_width, 3) diff --git a/tests/image/kernels/test_linear.py b/tests/image/kernels/test_linear.py new file mode 100644 index 00000000..50ccd460 --- /dev/null +++ b/tests/image/kernels/test_linear.py @@ -0,0 +1,124 @@ +# Copyright (c) 2026 Justin Davis (davisjustin302@gmail.com) +# +# MIT License +# mypy: disable-error-code="misc" +"""Tests for the linear resize CUDA kernel.""" + +from __future__ import annotations + +import math +from pathlib import Path + +import cv2 +import numpy as np +import pytest + +from trtutils.core import ( + Kernel, + create_binding, + create_stream, + destroy_stream, + memcpy_device_to_host_async, + memcpy_host_to_device_async, + stream_synchronize, +) +from trtutils.image import kernels + +_DATA_DIR = Path(__file__).parent.parent.parent.parent / "data" +_HORSE_IMAGE_PATH = _DATA_DIR / "horse.jpg" + + +def _run_linear_kernel( + img: np.ndarray, + output_shape: tuple[int, int], +) -> np.ndarray: + """Run the linear resize kernel and return result.""" + o_width, o_height = output_shape + height, width = img.shape[:2] + + stream = create_stream() + + num_threads: tuple[int, int, int] = (32, 32, 1) + num_blocks: tuple[int, int, int] = ( + math.ceil(o_width / num_threads[1]), + math.ceil(o_height / num_threads[0]), + 1, + ) + + input_binding = create_binding(img, is_input=True) + dummy_output = np.zeros((o_height, o_width, 3), dtype=np.uint8) + output_binding = create_binding(dummy_output, pagelocked_mem=True) + + kernel = Kernel(kernels.LINEAR_RESIZE[0], kernels.LINEAR_RESIZE[1]) + args = kernel.create_args( + input_binding.allocation, + output_binding.allocation, + width, + height, + o_width, + o_height, + ) + + memcpy_host_to_device_async(input_binding.allocation, img, stream) + kernel.call(num_blocks, num_threads, stream, args) + memcpy_device_to_host_async(output_binding.host_allocation, output_binding.allocation, stream) + stream_synchronize(stream) + + result = output_binding.host_allocation.copy() + + destroy_stream(stream) + input_binding.free() + output_binding.free() + kernel.free() + + return result + + +class TestLinearResizeKernel: + """Tests for the linear resize CUDA kernel.""" + + def test_compiles(self) -> None: + """Linear resize kernel compiles without error.""" + stream = create_stream() + compiled = Kernel(kernels.LINEAR_RESIZE[0], kernels.LINEAR_RESIZE[1]) + assert compiled is not None + destroy_stream(stream) + + def test_correctness_against_cv2(self) -> None: + """GPU linear resize matches cv2.resize(INTER_LINEAR).""" + if not _HORSE_IMAGE_PATH.exists(): + pytest.skip("Horse test image not found") + img = cv2.imread(str(_HORSE_IMAGE_PATH)) + if img is None: + pytest.skip("Failed to read test image") + + output_shape = (640, 480) + o_width, o_height = output_shape + resized_img = np.asarray( + cv2.resize(img, (o_width, o_height), interpolation=cv2.INTER_LINEAR) + ) + cuda_result = _run_linear_kernel(img, output_shape) + + assert cuda_result.shape == resized_img.shape + cpu_mean = float(resized_img.mean()) + assert cpu_mean - 0.5 <= np.mean(cuda_result) <= cpu_mean + 0.5 + diff_mask = np.any(resized_img != cuda_result, axis=-1) + avg_diff = np.mean(np.abs(resized_img[diff_mask] - cuda_result[diff_mask])) + assert avg_diff < 1.0 + + @pytest.mark.parametrize( + "output_shape", + [(640, 640), (416, 416)], + ids=["640x640", "416x416"], + ) + def test_various_target_sizes(self, output_shape: tuple[int, int]) -> None: + """Linear resize kernel works with various target sizes.""" + if not _HORSE_IMAGE_PATH.exists(): + pytest.skip("Horse test image not found") + img = cv2.imread(str(_HORSE_IMAGE_PATH)) + if img is None: + pytest.skip("Failed to read test image") + + o_width, o_height = output_shape + result = _run_linear_kernel(img, output_shape) + assert result.shape == (o_height, o_width, 3) diff --git a/tests/image/kernels/test_performance.py b/tests/image/kernels/test_performance.py new file mode 100644 index 00000000..d4281a5e --- /dev/null +++ b/tests/image/kernels/test_performance.py @@ -0,0 +1,128 @@ +# Copyright (c) 2026 Justin Davis (davisjustin302@gmail.com) +# +# MIT License +# mypy: disable-error-code="misc" +""" +Performance benchmarks for image preprocessing CUDA kernels. + +Port from: tests/legacy/image/kernels/test_sst_performance.py +""" + +from __future__ import annotations + +import math +import time +from pathlib import Path +from typing import Any + +import cv2 +import numpy as np +import pytest + +from trtutils.core import ( + Kernel, + create_binding, + create_stream, + destroy_stream, + memcpy_device_to_host_async, + memcpy_host_to_device_async, + stream_synchronize, +) +from trtutils.image import kernels + +_DATA_DIR = Path(__file__).parent.parent.parent.parent / "data" +_HORSE_IMAGE_PATH = _DATA_DIR / "horse.jpg" + + +def _get_kernel_timings(kernel_data: tuple[Any, Any], n_iter: int = 100) -> list[float]: + """Measure kernel execution timings.""" + output_height = 640 + output_width = 640 + batch_size = 1 + scale = 1.0 / 255.0 + offset = 0.0 + + if not _HORSE_IMAGE_PATH.exists(): + pytest.skip("Horse test image not found") + img = cv2.imread(str(_HORSE_IMAGE_PATH)) + if img is None: + pytest.skip("Failed to read test image") + img = cv2.resize(img, (output_width, output_height)) # type: ignore[arg-type] + + stream = create_stream() + + num_threads: tuple[int, int, int] = (32, 32, 1) + num_blocks: tuple[int, int, int] = ( + math.ceil(output_width / num_threads[0]), + math.ceil(output_height / num_threads[1]), + batch_size, + ) + + dummy_input = np.zeros((output_height, output_width, 3), dtype=np.uint8) + input_binding = create_binding(dummy_input, is_input=True) + dummy_output = np.zeros((1, 3, output_height, output_width), dtype=np.float32) + output_binding = create_binding(dummy_output, pagelocked_mem=True) + + kernel_obj = Kernel(kernel_data[0], kernel_data[1]) + args = kernel_obj.create_args( + input_binding.allocation, + output_binding.allocation, + scale, + offset, + output_height, + output_width, + batch_size, + ) + + memcpy_host_to_device_async(input_binding.allocation, img, stream) + kernel_obj.call(num_blocks, num_threads, stream, args) + memcpy_device_to_host_async(output_binding.host_allocation, output_binding.allocation, stream) + stream_synchronize(stream) + + timings: list[float] = [] + for _ in range(n_iter): + t0 = time.time() + kernel_obj.call(num_blocks, num_threads, stream, args) + stream_synchronize(stream) + t1 = time.time() + timings.append(t1 - t0) + + destroy_stream(stream) + input_binding.free() + output_binding.free() + kernel_obj.free() + + return timings + + +@pytest.mark.performance +class TestKernelPerformance: + """Performance benchmarks for CUDA image kernels.""" + + def test_sst_fast_faster_than_sst(self) -> None: + """SST_FAST kernel is faster than standard SCALE_SWAP_TRANSPOSE.""" + sst_fast_timings = _get_kernel_timings(kernels.SST_FAST) + sst_timings = _get_kernel_timings(kernels.SCALE_SWAP_TRANSPOSE) + + sst_fast_mean = float(np.mean(sst_fast_timings)) + sst_mean = float(np.mean(sst_timings)) + + print( + f"SST_FAST mean: {sst_fast_mean:.6f}s, SST mean: {sst_mean:.6f}s," + f" speedup: {sst_mean / sst_fast_mean:.2f}x" + ) + assert sst_fast_mean < sst_mean + + def test_sst_benchmark(self) -> None: + """Benchmark standard SST kernel timing.""" + timings = _get_kernel_timings(kernels.SCALE_SWAP_TRANSPOSE) + mean_time = float(np.mean(timings)) + print(f"SST kernel: {mean_time * 1000:.3f}ms avg over {len(timings)} iterations") + assert mean_time < 0.1 # should be well under 100ms per iteration + + def test_sst_fast_benchmark(self) -> None: + """Benchmark SST_FAST kernel timing.""" + timings = _get_kernel_timings(kernels.SST_FAST) + mean_time = float(np.mean(timings)) + print(f"SST_FAST kernel: {mean_time * 1000:.3f}ms avg over {len(timings)} iterations") + assert mean_time < 0.1 # should be well under 100ms per iteration diff --git a/tests/image/kernels/test_sst.py b/tests/image/kernels/test_sst.py new file mode 100644 index 00000000..dbde3198 --- /dev/null +++ b/tests/image/kernels/test_sst.py @@ -0,0 +1,260 @@ +# Copyright (c) 2026 Justin Davis (davisjustin302@gmail.com) +# +# MIT License +# mypy: disable-error-code="misc" +""" +Consolidated tests for all SST (Scale-Swap-Transpose) CUDA kernels. + +Ports from legacy: +- test_sst_kernel.py -> SCALE_SWAP_TRANSPOSE kernel +- test_sst_fast_kernel.py -> SST_FAST / SST_FAST_F16 kernels +- test_sst_imagenet_kernel.py -> IMAGENET_SST / IMAGENET_SST_F16 kernels +""" + +from __future__ import annotations + +import math +from pathlib import Path +from typing import Any + +import cv2 +import numpy as np +import pytest + +from trtutils.core import ( + Kernel, + create_binding, + create_stream, + destroy_stream, + memcpy_device_to_host_async, + memcpy_host_to_device_async, + stream_synchronize, +) +from trtutils.image import kernels +from trtutils.image.preprocessors import preprocess + +_DATA_DIR = Path(__file__).parent.parent.parent.parent / "data" +_HORSE_IMAGE_PATH = _DATA_DIR / "horse.jpg" + +_KERNEL_MAP: dict[str, tuple[Any, Any]] = { + "sst": kernels.SCALE_SWAP_TRANSPOSE, + "sst_fast": kernels.SST_FAST, + "sst_fast_f16": kernels.SST_FAST_F16, + "sst_imagenet": kernels.IMAGENET_SST, + "sst_imagenet_f16": kernels.IMAGENET_SST_F16, +} + + +def _run_sst_kernel( + img: np.ndarray, + output_height: int, + output_width: int, + kernel_key: str, + batch_size: int = 1, + scale: float = 1.0 / 255.0, + offset: float = 0.0, + mean: tuple[float, float, float] | None = None, + std: tuple[float, float, float] | None = None, +) -> np.ndarray: + """Run an SST-family kernel and return the result.""" + kernel_data = _KERNEL_MAP[kernel_key] + is_imagenet = kernel_key in ("sst_imagenet", "sst_imagenet_f16") + is_f16 = kernel_key in ("sst_fast_f16", "sst_imagenet_f16") + + stream = create_stream() + + num_threads: tuple[int, int, int] = (32, 32, 1) + num_blocks: tuple[int, int, int] = ( + math.ceil(output_width / num_threads[0]), + math.ceil(output_height / num_threads[1]), + batch_size, + ) + + if batch_size > 1: + dummy_input = np.zeros((batch_size, output_height, output_width, 3), dtype=np.uint8) + batch_img = np.stack([img] * batch_size, axis=0) + input_data = batch_img + else: + dummy_input = np.zeros((output_height, output_width, 3), dtype=np.uint8) + input_data = img + + out_dtype = np.float16 if is_f16 else np.float32 + dummy_output = np.zeros((batch_size, 3, output_height, output_width), dtype=out_dtype) + + input_binding = create_binding(dummy_input, is_input=True) + output_binding = create_binding(dummy_output, pagelocked_mem=True) + + kernel_obj = Kernel(kernel_data[0], kernel_data[1]) + + if is_imagenet: + assert mean is not None + assert std is not None + mean_array = np.array(mean, dtype=np.float32).reshape(1, 3, 1, 1) + std_array = np.array(std, dtype=np.float32).reshape(1, 3, 1, 1) + mean_binding = create_binding(mean_array) + std_binding = create_binding(std_array) + + memcpy_host_to_device_async(mean_binding.allocation, mean_array, stream) + memcpy_host_to_device_async(std_binding.allocation, std_array, stream) + + args = kernel_obj.create_args( + input_binding.allocation, + output_binding.allocation, + mean_binding.allocation, + std_binding.allocation, + output_height, + output_width, + batch_size, + ) + else: + args = kernel_obj.create_args( + input_binding.allocation, + output_binding.allocation, + scale, + offset, + output_height, + output_width, + batch_size, + ) + mean_binding = None + std_binding = None + + memcpy_host_to_device_async(input_binding.allocation, input_data, stream) + kernel_obj.call(num_blocks, num_threads, stream, args) + memcpy_device_to_host_async(output_binding.host_allocation, output_binding.allocation, stream) + stream_synchronize(stream) + + result = output_binding.host_allocation.copy() + + destroy_stream(stream) + input_binding.free() + output_binding.free() + kernel_obj.free() + if mean_binding is not None: + mean_binding.free() + if std_binding is not None: + std_binding.free() + + return result + + +class TestSSTKernelCompilation: + """Test SST kernel compilation.""" + + @pytest.mark.parametrize("kernel_key", ["sst", "sst_fast", "sst_imagenet"]) + def test_compiles(self, kernel_key: str) -> None: + """Each SST kernel variant compiles without error.""" + stream = create_stream() + compiled = Kernel(_KERNEL_MAP[kernel_key][0], _KERNEL_MAP[kernel_key][1]) + assert compiled is not None + destroy_stream(stream) + + @pytest.mark.parametrize( + "kernel_key", + ["sst_fast_f16", "sst_imagenet_f16"], + ids=["sst_fast_f16", "sst_imagenet_f16"], + ) + def test_f16_precision_variants_compile(self, kernel_key: str) -> None: + """F16 precision variants compile without error.""" + stream = create_stream() + compiled = Kernel(_KERNEL_MAP[kernel_key][0], _KERNEL_MAP[kernel_key][1]) + assert compiled is not None + destroy_stream(stream) + + +class TestSSTKernelCorrectness: + """Test SST kernel output correctness against CPU implementation.""" + + @pytest.mark.parametrize("kernel_key", ["sst", "sst_fast"]) + def test_correctness_against_cpu(self, kernel_key: str) -> None: + """GPU SST result matches CPU preprocess() output.""" + if not _HORSE_IMAGE_PATH.exists(): + pytest.skip("Horse test image not found") + img = cv2.imread(str(_HORSE_IMAGE_PATH)) + if img is None: + pytest.skip("Failed to read test image") + + output_height = output_width = 640 + img_resized = cv2.resize(img, (output_width, output_height)) + + cuda_result = _run_sst_kernel(img_resized, output_height, output_width, kernel_key) + cpu_result, _, _ = preprocess( + [img_resized], (output_width, output_height), np.dtype(np.float32) + ) + + assert cuda_result.shape == cpu_result.shape + assert np.mean(cuda_result) == np.mean(cpu_result) + assert np.allclose(cuda_result, cpu_result) + + def test_imagenet_normalization(self) -> None: + """SST_IMAGENET kernel applies mean/std normalization correctly.""" + if not _HORSE_IMAGE_PATH.exists(): + pytest.skip("Horse test image not found") + img = cv2.imread(str(_HORSE_IMAGE_PATH)) + if img is None: + pytest.skip("Failed to read test image") + + output_height = output_width = 640 + mean = (0.485, 0.456, 0.406) + std = (0.229, 0.224, 0.225) + img_resized = cv2.resize(img, (output_width, output_height)) + + cuda_result = _run_sst_kernel( + img_resized, output_height, output_width, "sst_imagenet", mean=mean, std=std + ) + cpu_result, _, _ = preprocess( + [img_resized], + (output_width, output_height), + np.dtype(np.float32), + mean=mean, + std=std, + ) + + assert cuda_result.shape == cpu_result.shape + assert np.isclose(np.mean(cuda_result), np.mean(cpu_result), rtol=1e-6, atol=1e-6) + assert np.allclose(cuda_result, cpu_result, rtol=1e-6, atol=1e-6) + + def test_sst_fast_f16_correctness(self) -> None: + """SST_FAST_F16 kernel output matches CPU within relaxed fp16 tolerances.""" + if not _HORSE_IMAGE_PATH.exists(): + pytest.skip("Horse test image not found") + img = cv2.imread(str(_HORSE_IMAGE_PATH)) + if img is None: + pytest.skip("Failed to read test image") + + output_height = output_width = 640 + img_resized = cv2.resize(img, (output_width, output_height)) + + cuda_result = _run_sst_kernel(img_resized, output_height, output_width, "sst_fast_f16") + cpu_result, _, _ = preprocess( + [img_resized], (output_width, output_height), np.dtype(np.float32) + ) + + assert cuda_result.shape == cpu_result.shape + assert np.isclose(np.mean(cuda_result), np.mean(cpu_result), rtol=1e-3, atol=1e-3) + assert np.allclose(cuda_result, cpu_result, rtol=1e-3, atol=1e-3) + + +class TestSSTBatchProcessing: + """Test SST kernel with batch inputs.""" + + @pytest.mark.parametrize("kernel_key", ["sst_fast"]) + def test_batch_matches_single(self, kernel_key: str) -> None: + """Batch SST output matches single-image output per element.""" + if not _HORSE_IMAGE_PATH.exists(): + pytest.skip("Horse test image not found") + img = cv2.imread(str(_HORSE_IMAGE_PATH)) + if img is None: + pytest.skip("Failed to read test image") + + output_height = output_width = 640 + batch_size = 4 + img_resized = cv2.resize(img, (output_width, output_height)) + + batch_result = _run_sst_kernel( + img_resized, output_height, output_width, kernel_key, batch_size=batch_size + ) + single_result = _run_sst_kernel(img_resized, output_height, output_width, kernel_key) + + for i in range(batch_size): + assert np.allclose(batch_result[i], single_result[0]) diff --git a/tests/image/onnx/test_preproc_engine.py b/tests/image/onnx/test_preproc_engine.py new file mode 100644 index 00000000..ac03cf2a --- /dev/null +++ b/tests/image/onnx/test_preproc_engine.py @@ -0,0 +1,210 @@ +# Copyright (c) 2026 Justin Davis (davisjustin302@gmail.com) +# +# MIT License +# mypy: disable-error-code="misc,import-untyped" +""" +Tests for TRT preprocessing ONNX model engines. + +Port from: tests/legacy/image/onnx/test_image_preproc.py +""" + +from __future__ import annotations + +from pathlib import Path + +import cv2 +import numpy as np +import pytest + +from trtutils import TRTEngine +from trtutils.image.onnx_models import build_image_preproc, build_image_preproc_imagenet +from trtutils.image.preprocessors import preprocess + +_TRT_VERSION: str | None = None +try: + import tensorrt as _trt_module # type: ignore[import-untyped] + + _TRT_VERSION = str(_trt_module.__version__) +except ImportError: + _trt_module = None # type: ignore[assignment] + +_DATA_DIR = Path(__file__).parent.parent.parent.parent / "data" +_HORSE_IMAGE_PATH = _DATA_DIR / "horse.jpg" + + +def _trt_available() -> bool: + """Check if TensorRT is available.""" + return _TRT_VERSION is not None + + +class TestTRTPreprocEngine: + """Tests for TRT preprocessing ONNX engines.""" + + def test_trt_preproc_matches_cpu(self) -> None: + """TRT preprocessing engine output matches CPU preprocessing.""" + if not _trt_available(): + pytest.skip("TensorRT not available") + if not _HORSE_IMAGE_PATH.exists(): + pytest.skip("Horse test image not found") + + img = cv2.imread(str(_HORSE_IMAGE_PATH)) + if img is None: + pytest.skip("Failed to read test image") + + output_shape = 640 + o_range = (0.0, 1.0) + scale = o_range[1] / 255.0 + offset = o_range[0] + + img = cv2.resize(img, (output_shape, output_shape)) # type: ignore[arg-type] + + cpu_result, _, _ = preprocess( + [img], (output_shape, output_shape), np.dtype(np.float32), input_range=o_range + ) + cpu_result = cpu_result[0] + + try: + engine_path = build_image_preproc( + (output_shape, output_shape), + np.dtype(np.float32), + trt_version=str(_TRT_VERSION), + ) + except RuntimeError as e: + if "Failed to build engine" in str(e): + pytest.skip(f"TRT cannot build for this GPU: {e}") + raise + engine = TRTEngine(engine_path) + engine.mock_execute() + + all_result = engine.execute( + [ + img, + np.array((scale,), dtype=np.float32), + np.array((offset,), dtype=np.float32), + ] + ) + trt_result = all_result[0] + if trt_result.ndim == 4: + trt_result = trt_result[0] + + assert trt_result.shape == cpu_result.shape + assert trt_result.dtype == cpu_result.dtype + assert np.min(trt_result) >= 0.0 + assert np.max(trt_result) <= 1.0 + + diff_mask = np.any(cpu_result != trt_result, axis=-1) + avg_diff = np.mean(np.abs(cpu_result[diff_mask] - trt_result[diff_mask])) + assert avg_diff < 0.0001, f"avg diff: {avg_diff}" + assert np.allclose(trt_result, cpu_result, rtol=5e-4, atol=5e-4) + + del engine + + def test_trt_preproc_imagenet_matches_cpu(self) -> None: + """TRT ImageNet preprocessing engine output matches CPU preprocessing.""" + if not _trt_available(): + pytest.skip("TensorRT not available") + if not _HORSE_IMAGE_PATH.exists(): + pytest.skip("Horse test image not found") + + img = cv2.imread(str(_HORSE_IMAGE_PATH)) + if img is None: + pytest.skip("Failed to read test image") + + output_shape = 640 + mean = (0.485, 0.456, 0.406) + std = (0.229, 0.224, 0.225) + + img = cv2.resize(img, (output_shape, output_shape)) # type: ignore[arg-type] + + cpu_result, _, _ = preprocess( + [img], + (output_shape, output_shape), + np.dtype(np.float32), + input_range=(0.0, 1.0), + mean=mean, + std=std, + ) + cpu_result = cpu_result[0] + + try: + engine_path = build_image_preproc_imagenet( + (output_shape, output_shape), + np.dtype(np.float32), + trt_version=str(_TRT_VERSION), + ) + except RuntimeError as e: + if "Failed to build engine" in str(e): + pytest.skip(f"TRT cannot build for this GPU: {e}") + raise + engine = TRTEngine(engine_path) + engine.mock_execute() + + mean_array = np.array(mean, dtype=np.float32).reshape(1, 3, 1, 1) + std_array = np.array(std, dtype=np.float32).reshape(1, 3, 1, 1) + + all_result = engine.execute([img, mean_array, std_array]) + trt_result = all_result[0] + if trt_result.ndim == 4: + trt_result = trt_result[0] + + assert trt_result.shape == cpu_result.shape + assert trt_result.dtype == cpu_result.dtype + + # ImageNet normalization engine uses fp16, so use relaxed tolerance + assert np.allclose(trt_result, cpu_result, rtol=2e-3, atol=2e-3) + + del engine + + def test_numerical_tolerance(self) -> None: + """TRT preproc engine meets expected numerical tolerance bounds.""" + if not _trt_available(): + pytest.skip("TensorRT not available") + if not _HORSE_IMAGE_PATH.exists(): + pytest.skip("Horse test image not found") + + img = cv2.imread(str(_HORSE_IMAGE_PATH)) + if img is None: + pytest.skip("Failed to read test image") + + output_shape = 640 + o_range = (0.0, 1.0) + scale = o_range[1] / 255.0 + offset = o_range[0] + img = cv2.resize(img, (output_shape, output_shape)) # type: ignore[arg-type] + + cpu_result, _, _ = preprocess( + [img], (output_shape, output_shape), np.dtype(np.float32), input_range=o_range + ) + cpu_result = cpu_result[0] + + try: + engine_path = build_image_preproc( + (output_shape, output_shape), + np.dtype(np.float32), + trt_version=str(_TRT_VERSION), + ) + except RuntimeError as e: + if "Failed to build engine" in str(e): + pytest.skip(f"TRT cannot build for this GPU: {e}") + raise + engine = TRTEngine(engine_path) + engine.mock_execute() + + all_result = engine.execute( + [ + img, + np.array((scale,), dtype=np.float32), + np.array((offset,), dtype=np.float32), + ] + ) + trt_result = all_result[0] + if trt_result.ndim == 4: + trt_result = trt_result[0] + + cpu_mean = np.mean(cpu_result) + trt_mean = np.mean(trt_result) + assert cpu_mean * 0.99 <= trt_mean <= cpu_mean * 1.01, ( + f"CPU mean: {cpu_mean}, TRT mean: {trt_mean}" + ) + + del engine diff --git a/tests/image/test_classifier.py b/tests/image/test_classifier.py new file mode 100644 index 00000000..7341357c --- /dev/null +++ b/tests/image/test_classifier.py @@ -0,0 +1,96 @@ +# Copyright (c) 2026 Justin Davis (davisjustin302@gmail.com) +# +# MIT License +# mypy: disable-error-code="misc" +"""Tests for the Classifier class.""" + +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING + +import pytest + +if TYPE_CHECKING: + import numpy as np + +BASE_DIR = Path(__file__).parent.parent.parent +DATA_DIR = BASE_DIR / "data" +# Classifier models are optional - skip if not available +CLASSIFIER_ONNX = DATA_DIR / "onnx" / "resnet18.onnx" + + +@pytest.fixture(scope="module") +def classifier_engine(build_test_engine) -> Path: + """Build and cache a classifier engine.""" + if not CLASSIFIER_ONNX.exists(): + pytest.skip("Classifier ONNX not available (resnet18.onnx)") + return build_test_engine(CLASSIFIER_ONNX) + + +class TestClassifierInference: + """Test Classifier inference.""" + + def test_run_single_image(self, classifier_engine: Path, images) -> None: + """run() with single image returns outputs.""" + horse_image = images["horse"].array + from trtutils.image import Classifier + + clf = Classifier(classifier_engine, warmup=False) + results = clf.run([horse_image], postprocess=False) + assert isinstance(results, list) + assert len(results) > 0 + + def test_run_batch(self, classifier_engine: Path, test_images: list[np.ndarray]) -> None: + """run() with batch returns outputs for each image.""" + from trtutils.image import Classifier + + clf = Classifier(classifier_engine, warmup=False) + results = clf.run(test_images, postprocess=False) + assert isinstance(results, list) + + def test_end2end(self, classifier_engine: Path, images) -> None: + """end2end() returns classification results.""" + horse_image = images["horse"].array + from trtutils.image import Classifier + + clf = Classifier(classifier_engine, warmup=False) + classifications = clf.end2end([horse_image]) + assert isinstance(classifications, list) + assert len(classifications) == 1 + + def test_get_classifications(self, classifier_engine: Path, images) -> None: + """get_classifications() returns top-k results.""" + horse_image = images["horse"].array + from trtutils.image import Classifier + + clf = Classifier(classifier_engine, warmup=False) + postprocessed = clf.run([horse_image], postprocess=True) + classifications = clf.get_classifications(postprocessed, top_k=5) + assert len(classifications) >= 1 + + +class TestClassifierPostprocessing: + """Test Classifier postprocessing.""" + + def test_postprocess_returns_probabilities(self, classifier_engine: Path, images) -> None: + """Postprocessed output probabilities sum to approximately 1.""" + horse_image = images["horse"].array + from trtutils.image import Classifier + + clf = Classifier(classifier_engine, warmup=False) + raw = clf.run([horse_image], postprocess=False) + processed = clf.postprocess(raw) + # Softmax probabilities should sum to ~1 + assert isinstance(processed, list) + + def test_top_k_limits(self, classifier_engine: Path, images) -> None: + """top_k parameter controls number of results in end2end.""" + horse_image = images["horse"].array + from trtutils.image import Classifier + + clf = Classifier(classifier_engine, warmup=False) + results_5 = clf.end2end([horse_image], top_k=5) + results_1 = clf.end2end([horse_image], top_k=1) + assert isinstance(results_5, list) + assert isinstance(results_1, list) diff --git a/tests/image/test_depth_estimator.py b/tests/image/test_depth_estimator.py new file mode 100644 index 00000000..5d6f3c89 --- /dev/null +++ b/tests/image/test_depth_estimator.py @@ -0,0 +1,71 @@ +# Copyright (c) 2026 Justin Davis (davisjustin302@gmail.com) +# +# MIT License +# mypy: disable-error-code="misc" +"""Tests for the DepthEstimator class.""" + +from __future__ import annotations + +from pathlib import Path + +import numpy as np +import pytest + +BASE_DIR = Path(__file__).parent.parent.parent +DATA_DIR = BASE_DIR / "data" +# Depth estimator models are optional - skip if not available +DEPTH_ONNX = DATA_DIR / "onnx" / "depth_anything_v2_small.onnx" + + +@pytest.fixture(scope="module") +def depth_engine(build_test_engine) -> Path: + """Build and cache a depth estimator engine.""" + if not DEPTH_ONNX.exists(): + pytest.skip("Depth estimator ONNX not available") + return build_test_engine(DEPTH_ONNX) + + +class TestDepthEstimatorInference: + """Test DepthEstimator inference.""" + + def test_run_single_image(self, depth_engine: Path, images) -> None: + """run() with single image returns outputs.""" + horse_image = images["horse"].array + from trtutils.image import DepthEstimator + + model = DepthEstimator(depth_engine, warmup=False) + results = model.run([horse_image], postprocess=False) + assert isinstance(results, list) + assert len(results) > 0 + + def test_run_batch(self, depth_engine: Path, test_images: list[np.ndarray]) -> None: + """run() with batch returns outputs.""" + from trtutils.image import DepthEstimator + + model = DepthEstimator(depth_engine, warmup=False) + results = model.run(test_images, postprocess=False) + assert isinstance(results, list) + + def test_output_is_depth_map(self, depth_engine: Path, images) -> None: + """end2end() returns depth maps with spatial dimensions.""" + horse_image = images["horse"].array + from trtutils.image import DepthEstimator + + model = DepthEstimator(depth_engine, warmup=False) + depth_maps = model.end2end([horse_image]) + assert isinstance(depth_maps, list) + assert len(depth_maps) == 1 + depth = depth_maps[0] + assert isinstance(depth, np.ndarray) + assert depth.ndim >= 2 # spatial dimensions preserved + + def test_depth_values_positive(self, depth_engine: Path, images) -> None: + """Depth values should be positive (distance from camera).""" + horse_image = images["horse"].array + from trtutils.image import DepthEstimator + + model = DepthEstimator(depth_engine, warmup=False) + depth_maps = model.end2end([horse_image]) + depth = depth_maps[0] + # Depth values should be >= 0 + assert depth.min() >= 0.0 diff --git a/tests/image/test_detector.py b/tests/image/test_detector.py new file mode 100644 index 00000000..0a57119b --- /dev/null +++ b/tests/image/test_detector.py @@ -0,0 +1,110 @@ +# Copyright (c) 2024-2026 Justin Davis (davisjustin302@gmail.com) +# +# MIT License +# mypy: disable-error-code="misc" +"""Tests for the Detector class.""" + +from __future__ import annotations + +from pathlib import Path + +import numpy as np +import pytest + +BASE_DIR = Path(__file__).parent.parent.parent +DATA_DIR = BASE_DIR / "data" +YOLOV10_ONNX = DATA_DIR / "yolov10" / "yolov10n_640.onnx" + + +@pytest.fixture(scope="module") +def yolov10_engine(build_test_engine) -> Path: + """Build and cache a YOLOv10n engine for the test module.""" + if not YOLOV10_ONNX.exists(): + pytest.skip("yolov10n_640.onnx not available") + return build_test_engine(YOLOV10_ONNX) + + +class TestDetectorInference: + """Test Detector inference modes.""" + + def test_run_returns_outputs(self, yolov10_engine: Path, images) -> None: + """run() with postprocess=False returns list of raw output arrays.""" + horse_image = images["horse"].array + from trtutils.models import YOLOv10 + + det = YOLOv10(yolov10_engine, warmup=False) + results = det.run([horse_image], postprocess=False) + assert isinstance(results, list) + assert len(results) > 0 + assert isinstance(results[0], np.ndarray) + + def test_run_with_postprocess(self, yolov10_engine: Path, images) -> None: + """run() with postprocess=True returns postprocessed results.""" + horse_image = images["horse"].array + from trtutils.models import YOLOv10 + + det = YOLOv10(yolov10_engine, warmup=False) + results = det.run([horse_image], postprocess=True) + assert isinstance(results, list) + + @pytest.mark.parametrize("preprocessor", ["cpu", "cuda", "trt"]) + def test_preprocessor_variants(self, yolov10_engine: Path, images, preprocessor: str) -> None: + """All preprocessors produce valid outputs.""" + horse_image = images["horse"].array + from trtutils.models import YOLOv10 + + det = YOLOv10(yolov10_engine, preprocessor=preprocessor, warmup=False) + results = det.run([horse_image], postprocess=False) + assert isinstance(results, list) + assert len(results) > 0 + + +class TestDetectorEnd2End: + """Test Detector end2end pipeline.""" + + def test_end2end_single(self, yolov10_engine: Path, images) -> None: + """end2end() with single image returns list of detections.""" + horse_image = images["horse"].array + from trtutils.models import YOLOv10 + + det = YOLOv10(yolov10_engine, warmup=False) + detections = det.end2end([horse_image]) + assert isinstance(detections, list) + assert len(detections) == 1 + + def test_end2end_returns_detections(self, yolov10_engine: Path, images) -> None: + """end2end() returns list[list[tuple]] structure.""" + horse_image = images["horse"].array + from trtutils.models import YOLOv10 + + det = YOLOv10(yolov10_engine, warmup=False) + detections = det.end2end([horse_image]) + # detections[0] is a list of (bbox, score, class_id) tuples + assert isinstance(detections, list) + for det_list in detections: + assert isinstance(det_list, list) + for d in det_list: + assert len(d) == 3 + + +class TestDetectorBatch: + """Test Detector batch processing.""" + + def test_batch_processing_single(self, yolov10_engine: Path, images) -> None: + """Single-image batch inference runs correctly.""" + horse_image = images["horse"].array + from trtutils.models import YOLOv10 + + det = YOLOv10(yolov10_engine, warmup=False) + results = det.run([horse_image], postprocess=False) + assert isinstance(results, list) + assert len(results) > 0 + + def test_batch_end2end(self, yolov10_engine: Path, images) -> None: + """end2end returns one detection list per image.""" + horse_image = images["horse"].array + from trtutils.models import YOLOv10 + + det = YOLOv10(yolov10_engine, warmup=False) + detections = det.end2end([horse_image]) + assert len(detections) == 1 diff --git a/tests/image/test_image_model.py b/tests/image/test_image_model.py new file mode 100644 index 00000000..3a6a013b --- /dev/null +++ b/tests/image/test_image_model.py @@ -0,0 +1,113 @@ +# Copyright (c) 2026 Justin Davis (davisjustin302@gmail.com) +# +# MIT License +# mypy: disable-error-code="misc" +"""Tests for ImageModel base class functionality.""" + +from __future__ import annotations + +from pathlib import Path + +import numpy as np +import pytest + +BASE_DIR = Path(__file__).parent.parent.parent +DATA_DIR = BASE_DIR / "data" +YOLOV10_ONNX = DATA_DIR / "yolov10" / "yolov10n_640.onnx" + + +# --------------------------------------------------------------------------- +# GPU tests — require a real engine +# --------------------------------------------------------------------------- +@pytest.fixture(scope="module") +def yolov10_engine(build_test_engine) -> Path: + """Build and cache a YOLOv10n engine for the test module.""" + if not YOLOV10_ONNX.exists(): + pytest.skip("yolov10n_640.onnx not available") + return build_test_engine(YOLOV10_ONNX) + + +class TestImageModelInit: + """Test ImageModel initialization with various options.""" + + @pytest.mark.parametrize("preprocessor", ["cpu", "cuda", "trt"]) + def test_init_with_preprocessor_types(self, yolov10_engine: Path, preprocessor: str) -> None: + """All 3 preprocessor backends initialize correctly.""" + from trtutils.models import YOLOv10 + + model = YOLOv10(yolov10_engine, preprocessor=preprocessor, warmup=False) + assert model is not None + + @pytest.mark.parametrize("resize_method", ["linear", "letterbox"]) + def test_init_with_resize_methods(self, yolov10_engine: Path, resize_method: str) -> None: + """Both resize methods initialize correctly.""" + from trtutils.models import YOLOv10 + + model = YOLOv10(yolov10_engine, resize_method=resize_method, warmup=False) + assert model is not None + + @pytest.mark.parametrize("backend", ["auto"]) + def test_init_with_backends(self, yolov10_engine: Path, backend: str) -> None: + """Supported execution backends initialize correctly.""" + from trtutils.models import YOLOv10 + + model = YOLOv10(yolov10_engine, backend=backend, warmup=False) + assert model is not None + + +class TestImageModelPreprocessing: + """Test ImageModel preprocessing.""" + + def test_preprocess_single_image(self, yolov10_engine: Path, images) -> None: + """Preprocessing single np.ndarray input produces correct shape.""" + horse_image = images["horse"].array + from trtutils.models import YOLOv10 + + model = YOLOv10(yolov10_engine, warmup=False) + result, _, _ = model.preprocess([horse_image]) + assert isinstance(result, np.ndarray) + assert result.ndim == 4 # (batch, C, H, W) + assert result.shape[0] == 1 + + def test_preprocess_batch(self, yolov10_engine: Path, test_images: list[np.ndarray]) -> None: + """Preprocessing list input produces batch output.""" + from trtutils.models import YOLOv10 + + # Use TRT preprocessor with single image (engine has static batch=1) + model = YOLOv10(yolov10_engine, preprocessor="cpu", warmup=False) + images = test_images[:2] + result, _, _ = model.preprocess(images) + assert isinstance(result, np.ndarray) + assert result.shape[0] == len(images) + + def test_preprocess_output_shape(self, yolov10_engine: Path, images) -> None: + """Preprocessed output has correct (1, 3, 640, 640) shape.""" + horse_image = images["horse"].array + from trtutils.models import YOLOv10 + + model = YOLOv10(yolov10_engine, warmup=False) + result, _, _ = model.preprocess([horse_image]) + assert result.shape == (1, 3, 640, 640) + + +class TestImageModelUtilities: + """Test ImageModel utility methods.""" + + def test_get_random_input(self, yolov10_engine: Path) -> None: + """get_random_input generates valid random image tensors.""" + from trtutils.models import YOLOv10 + + model = YOLOv10(yolov10_engine, warmup=False) + rand_input = model.get_random_input() + # Returns list[np.ndarray] (one per engine input) + assert isinstance(rand_input, list) + assert len(rand_input) > 0 + assert isinstance(rand_input[0], np.ndarray) + + def test_mock_run(self, yolov10_engine: Path) -> None: + """Engine mock_execute runs without error.""" + from trtutils.models import YOLOv10 + + model = YOLOv10(yolov10_engine, warmup=False) + engine = model.engine # public property + engine.mock_execute() # should not raise diff --git a/tests/image/test_postproc.py b/tests/image/test_postproc.py new file mode 100644 index 00000000..fe6d7769 --- /dev/null +++ b/tests/image/test_postproc.py @@ -0,0 +1,469 @@ +# Copyright (c) 2025-2026 Justin Davis (davisjustin302@gmail.com) +# +# MIT License +# mypy: disable-error-code="misc" +from __future__ import annotations + +from typing import Callable, List, Tuple + +import numpy as np +import pytest + +from trtutils.image.postprocessors import ( + get_classifications, + get_detections, + postprocess_classifications, + postprocess_detr, + postprocess_efficient_nms, + postprocess_rfdetr, + postprocess_yolov10, +) + +RatiosPaddingFactory = Callable[..., Tuple[List[Tuple[float, float]], List[Tuple[float, float]]]] +YoloOutputFactory = Callable[..., List[np.ndarray]] +EfficientNmsOutputFactory = Callable[..., List[np.ndarray]] +RfdetrOutputFactory = Callable[..., List[np.ndarray]] +DetrOutputFactory = Callable[..., List[np.ndarray]] +ClassificationOutputFactory = Callable[..., List[np.ndarray]] + + +class TestYOLOv10Postproc: + """Test YOLOv10 postprocessing helpers.""" + + def test_single_image( + self, make_yolov10_output: YoloOutputFactory, make_ratios_padding: RatiosPaddingFactory + ) -> None: + """Postprocess a single image output.""" + outputs = make_yolov10_output(batch_size=1, num_dets=5) + ratios, padding = make_ratios_padding(1) + results = postprocess_yolov10(outputs, ratios, padding) + assert len(results) == 1 + assert len(results[0]) == 3 + assert results[0][0].shape[1] == 4 + assert len(results[0][1]) == len(results[0][0]) + assert len(results[0][2]) == len(results[0][0]) + + @pytest.mark.parametrize("batch_size", [2, 4]) + def test_batch( + self, + make_yolov10_output: YoloOutputFactory, + make_ratios_padding: RatiosPaddingFactory, + batch_size: int, + ) -> None: + """Postprocess batch outputs.""" + outputs = make_yolov10_output(batch_size=batch_size, num_dets=5) + ratios, padding = make_ratios_padding(batch_size) + results = postprocess_yolov10(outputs, ratios, padding) + assert len(results) == batch_size + for result in results: + assert len(result) == 3 + + def test_batch_parity( + self, make_yolov10_output: YoloOutputFactory, make_ratios_padding: RatiosPaddingFactory + ) -> None: + """Batch postprocess matches per-image postprocess.""" + batch_size = 3 + outputs = make_yolov10_output(batch_size=batch_size, num_dets=5) + ratios, padding = make_ratios_padding(batch_size) + batch_results = postprocess_yolov10(outputs, ratios, padding) + for i in range(batch_size): + single_outputs = [out[i : i + 1] for out in outputs] + single_results = postprocess_yolov10(single_outputs, [ratios[i]], [padding[i]]) + assert len(single_results) == 1 + np.testing.assert_array_almost_equal( + batch_results[i][0], single_results[0][0], decimal=5 + ) + np.testing.assert_array_almost_equal( + batch_results[i][1], single_results[0][1], decimal=5 + ) + np.testing.assert_array_equal(batch_results[i][2], single_results[0][2]) + + def test_confidence_threshold( + self, make_yolov10_output: YoloOutputFactory, make_ratios_padding: RatiosPaddingFactory + ) -> None: + """Confidence threshold filters detections.""" + outputs = make_yolov10_output(batch_size=2, num_dets=10) + ratios, padding = make_ratios_padding(2) + results_filtered = postprocess_yolov10(outputs, ratios, padding, conf_thres=0.8) + results_unfiltered = postprocess_yolov10(outputs, ratios, padding, conf_thres=None) + for i in range(2): + assert len(results_filtered[i][0]) <= len(results_unfiltered[i][0]) + + def test_empty_detections(self, make_ratios_padding: RatiosPaddingFactory) -> None: + """Empty detections produce empty arrays.""" + outputs = [np.zeros((2, 300, 6), dtype=np.float32)] + ratios, padding = make_ratios_padding(2) + results = postprocess_yolov10(outputs, ratios, padding, conf_thres=0.5) + assert len(results) == 2 + for result in results: + assert len(result[0]) == 0 + + +class TestEfficientNMSPostproc: + """Test EfficientNMS postprocessing helpers.""" + + def test_single_image( + self, + make_efficient_nms_output: EfficientNmsOutputFactory, + make_ratios_padding: RatiosPaddingFactory, + ) -> None: + """Postprocess a single image output.""" + outputs = make_efficient_nms_output(batch_size=1, num_dets=5) + ratios, padding = make_ratios_padding(1) + results = postprocess_efficient_nms(outputs, ratios, padding) + assert len(results) == 1 + assert len(results[0]) == 3 + + @pytest.mark.parametrize("batch_size", [2, 4]) + def test_batch( + self, + make_efficient_nms_output: EfficientNmsOutputFactory, + make_ratios_padding: RatiosPaddingFactory, + batch_size: int, + ) -> None: + """Postprocess batch outputs.""" + outputs = make_efficient_nms_output(batch_size=batch_size, num_dets=5) + ratios, padding = make_ratios_padding(batch_size) + results = postprocess_efficient_nms(outputs, ratios, padding) + assert len(results) == batch_size + for result in results: + assert len(result) == 3 + + def test_batch_parity( + self, + make_efficient_nms_output: EfficientNmsOutputFactory, + make_ratios_padding: RatiosPaddingFactory, + ) -> None: + """Batch postprocess matches per-image postprocess.""" + batch_size = 3 + outputs = make_efficient_nms_output(batch_size=batch_size, num_dets=5) + ratios, padding = make_ratios_padding(batch_size) + batch_results = postprocess_efficient_nms(outputs, ratios, padding) + for i in range(batch_size): + single_outputs = [ + outputs[0][i : i + 1], + outputs[1][i : i + 1], + outputs[2][i : i + 1], + outputs[3][i : i + 1], + ] + single_results = postprocess_efficient_nms(single_outputs, [ratios[i]], [padding[i]]) + assert len(single_results) == 1 + np.testing.assert_array_almost_equal( + batch_results[i][0], single_results[0][0], decimal=5 + ) + + def test_zero_detections(self, make_ratios_padding: RatiosPaddingFactory) -> None: + """Zero detections produce empty results.""" + batch_size = 2 + num_dets_arr = np.zeros((batch_size,), dtype=np.int32) + bboxes = np.zeros((batch_size, 100, 4), dtype=np.float32) + scores = np.zeros((batch_size, 100), dtype=np.float32) + class_ids = np.zeros((batch_size, 100), dtype=np.float32) + outputs = [num_dets_arr, bboxes, scores, class_ids] + ratios, padding = make_ratios_padding(batch_size) + results = postprocess_efficient_nms(outputs, ratios, padding) + assert len(results) == batch_size + for result in results: + assert len(result[0]) == 0 + + +class TestRFDETRPostproc: + """Test RF-DETR postprocessing helpers.""" + + def test_single_image( + self, make_rfdetr_output: RfdetrOutputFactory, make_ratios_padding: RatiosPaddingFactory + ) -> None: + """Postprocess a single image output.""" + outputs = make_rfdetr_output(batch_size=1, num_dets=5) + ratios, padding = make_ratios_padding(1) + results = postprocess_rfdetr(outputs, ratios, padding, input_size=(640, 640)) + assert len(results) == 1 + assert len(results[0]) == 3 + + @pytest.mark.parametrize("batch_size", [2, 4]) + def test_batch( + self, + make_rfdetr_output: RfdetrOutputFactory, + make_ratios_padding: RatiosPaddingFactory, + batch_size: int, + ) -> None: + """Postprocess batch outputs.""" + outputs = make_rfdetr_output(batch_size=batch_size, num_dets=5) + ratios, padding = make_ratios_padding(batch_size) + results = postprocess_rfdetr(outputs, ratios, padding, input_size=(640, 640)) + assert len(results) == batch_size + for result in results: + assert len(result) == 3 + + def test_batch_parity( + self, make_rfdetr_output: RfdetrOutputFactory, make_ratios_padding: RatiosPaddingFactory + ) -> None: + """Batch postprocess matches per-image postprocess.""" + batch_size = 3 + outputs = make_rfdetr_output(batch_size=batch_size, num_dets=5) + ratios, padding = make_ratios_padding(batch_size) + batch_results = postprocess_rfdetr(outputs, ratios, padding, input_size=(640, 640)) + for i in range(batch_size): + single_outputs = [out[i : i + 1] for out in outputs] + single_results = postprocess_rfdetr( + single_outputs, [ratios[i]], [padding[i]], input_size=(640, 640) + ) + assert len(single_results) == 1 + np.testing.assert_array_almost_equal( + batch_results[i][0], single_results[0][0], decimal=5 + ) + + def test_with_input_size( + self, make_rfdetr_output: RfdetrOutputFactory, make_ratios_padding: RatiosPaddingFactory + ) -> None: + """input_size parameter is accepted and used.""" + outputs = make_rfdetr_output(batch_size=2, num_dets=5) + ratios, padding = make_ratios_padding(2) + results = postprocess_rfdetr(outputs, ratios, padding, input_size=(416, 416)) + assert len(results) == 2 + + +class TestDETRPostproc: + """Test DETR postprocessing helpers.""" + + def test_single_image( + self, make_detr_output: DetrOutputFactory, make_ratios_padding: RatiosPaddingFactory + ) -> None: + """Postprocess a single image output.""" + outputs = make_detr_output(batch_size=1, num_dets=5) + ratios, padding = make_ratios_padding(1) + results = postprocess_detr(outputs, ratios, padding) + assert len(results) == 1 + assert len(results[0]) == 3 + + @pytest.mark.parametrize("batch_size", [2, 4]) + def test_batch( + self, + make_detr_output: DetrOutputFactory, + make_ratios_padding: RatiosPaddingFactory, + batch_size: int, + ) -> None: + """Postprocess batch outputs.""" + outputs = make_detr_output(batch_size=batch_size, num_dets=5) + ratios, padding = make_ratios_padding(batch_size) + results = postprocess_detr(outputs, ratios, padding) + assert len(results) == batch_size + for result in results: + assert len(result) == 3 + + def test_batch_parity( + self, make_detr_output: DetrOutputFactory, make_ratios_padding: RatiosPaddingFactory + ) -> None: + """Batch postprocess matches per-image postprocess.""" + batch_size = 3 + outputs = make_detr_output(batch_size=batch_size, num_dets=5) + ratios, padding = make_ratios_padding(batch_size) + batch_results = postprocess_detr(outputs, ratios, padding) + for i in range(batch_size): + single_outputs = [out[i : i + 1] for out in outputs] + single_results = postprocess_detr(single_outputs, [ratios[i]], [padding[i]]) + assert len(single_results) == 1 + np.testing.assert_array_almost_equal( + batch_results[i][0], single_results[0][0], decimal=5 + ) + + def test_confidence_threshold( + self, make_detr_output: DetrOutputFactory, make_ratios_padding: RatiosPaddingFactory + ) -> None: + """Confidence threshold filters detections.""" + outputs = make_detr_output(batch_size=2, num_dets=10) + ratios, padding = make_ratios_padding(2) + results_filtered = postprocess_detr(outputs, ratios, padding, conf_thres=0.8) + results_unfiltered = postprocess_detr(outputs, ratios, padding, conf_thres=None) + for i in range(2): + assert len(results_filtered[i][0]) <= len(results_unfiltered[i][0]) + + +class TestClassificationPostproc: + """Test classification postprocessing helpers.""" + + def test_single_image(self, make_classification_output: ClassificationOutputFactory) -> None: + """Postprocess a single image output.""" + outputs = make_classification_output(batch_size=1) + results = postprocess_classifications(outputs) + assert len(results) == 1 + assert len(results[0]) == 1 + assert results[0][0].shape == (1, 1000) + assert np.isclose(np.sum(results[0][0]), 1.0, rtol=1e-5) + + @pytest.mark.parametrize("batch_size", [2, 4]) + def test_batch( + self, make_classification_output: ClassificationOutputFactory, batch_size: int + ) -> None: + """Postprocess a batch of outputs.""" + outputs = make_classification_output(batch_size=batch_size) + results = postprocess_classifications(outputs) + assert len(results) == batch_size + for result in results: + assert len(result) == 1 + assert np.isclose(np.sum(result[0]), 1.0, rtol=1e-5) + + def test_batch_parity(self, make_classification_output: ClassificationOutputFactory) -> None: + """Batch postprocess matches per-image postprocess.""" + batch_size = 3 + outputs_batch = make_classification_output(batch_size=batch_size) + batch_results = postprocess_classifications([out.copy() for out in outputs_batch]) + for i in range(batch_size): + single_outputs = [out[i : i + 1].copy() for out in outputs_batch] + single_results = postprocess_classifications(single_outputs) + assert len(single_results) == 1 + np.testing.assert_array_almost_equal( + batch_results[i][0], single_results[0][0], decimal=5 + ) + + +class TestGetDetections: + """Test get_detections helper.""" + + def test_single_image( + self, make_yolov10_output: YoloOutputFactory, make_ratios_padding: RatiosPaddingFactory + ) -> None: + """Get detections for a single image.""" + outputs = make_yolov10_output(batch_size=1, num_dets=5) + ratios, padding = make_ratios_padding(1) + postprocessed = postprocess_yolov10(outputs, ratios, padding) + detections = get_detections(postprocessed) + assert len(detections) == 1 + assert isinstance(detections[0], list) + for det in detections[0]: + assert len(det) == 3 + assert len(det[0]) == 4 + + @pytest.mark.parametrize("batch_size", [2, 4]) + def test_batch( + self, + make_yolov10_output: YoloOutputFactory, + make_ratios_padding: RatiosPaddingFactory, + batch_size: int, + ) -> None: + """Get detections for a batch.""" + outputs = make_yolov10_output(batch_size=batch_size, num_dets=5) + ratios, padding = make_ratios_padding(batch_size) + postprocessed = postprocess_yolov10(outputs, ratios, padding) + detections = get_detections(postprocessed) + assert len(detections) == batch_size + for image_dets in detections: + assert isinstance(image_dets, list) + + def test_confidence_filtering( + self, make_yolov10_output: YoloOutputFactory, make_ratios_padding: RatiosPaddingFactory + ) -> None: + """Confidence threshold filters detections.""" + batch_size = 2 + outputs = make_yolov10_output(batch_size=batch_size, num_dets=10) + ratios, padding = make_ratios_padding(batch_size) + postprocessed = postprocess_yolov10(outputs, ratios, padding) + dets_filtered = get_detections(postprocessed, conf_thres=0.8) + dets_unfiltered = get_detections(postprocessed, conf_thres=None) + for i in range(batch_size): + assert len(dets_filtered[i]) <= len(dets_unfiltered[i]) + + def test_structure( + self, make_yolov10_output: YoloOutputFactory, make_ratios_padding: RatiosPaddingFactory + ) -> None: + """Each detection is (bbox, score, class_id).""" + outputs = make_yolov10_output(1, num_dets=3) + ratios, padding = make_ratios_padding(1) + postprocessed = postprocess_yolov10(outputs, ratios, padding) + detections = get_detections(postprocessed) + for det in detections[0]: + bbox, score, class_id = det + assert all(isinstance(coord, int) for coord in bbox) + assert isinstance(score, float) + assert isinstance(class_id, int) + + +class TestGetClassifications: + """Test get_classifications helper.""" + + def test_single_image(self, make_classification_output: ClassificationOutputFactory) -> None: + """Get classifications for a single image.""" + outputs = make_classification_output(batch_size=1) + postprocessed = postprocess_classifications(outputs) + classifications = get_classifications(postprocessed, top_k=5) + assert len(classifications) == 1 + assert len(classifications[0]) == 5 + for class_id, confidence in classifications[0]: + assert isinstance(class_id, int) + assert isinstance(confidence, float) + assert 0.0 <= confidence <= 1.0 + + @pytest.mark.parametrize("batch_size", [2, 4]) + def test_batch( + self, make_classification_output: ClassificationOutputFactory, batch_size: int + ) -> None: + """Get classifications for a batch.""" + outputs = make_classification_output(batch_size=batch_size) + postprocessed = postprocess_classifications(outputs) + classifications = get_classifications(postprocessed, top_k=5) + assert len(classifications) == batch_size + for image_classifications in classifications: + assert len(image_classifications) == 5 + + @pytest.mark.parametrize("top_k", [1, 3, 10]) + def test_top_k( + self, make_classification_output: ClassificationOutputFactory, top_k: int + ) -> None: + """Top-k parameter controls number of results.""" + outputs = make_classification_output(batch_size=2) + postprocessed = postprocess_classifications(outputs) + classifications = get_classifications(postprocessed, top_k=top_k) + for image_classifications in classifications: + assert len(image_classifications) == top_k + + +class TestDifferentRatiosPerImage: + """Test varying ratios and padding per image.""" + + def test_varying_ratios_affect_output(self, make_yolov10_output: YoloOutputFactory) -> None: + """Different ratios affect outputs differently.""" + batch_size = 2 + outputs = make_yolov10_output(batch_size=batch_size, num_dets=3) + ratios = [(1.0, 1.0), (2.0, 2.0)] + padding = [(0.0, 0.0), (10.0, 10.0)] + results = postprocess_yolov10(outputs, ratios, padding) + assert len(results) == batch_size + if len(results[0][0]) > 0 and len(results[1][0]) > 0: + assert not np.allclose(results[0][0], results[1][0]) + + +class TestOutputStructure: + """Validate postprocessor output structures.""" + + def test_detection_structure( + self, + make_yolov10_output: YoloOutputFactory, + make_ratios_padding: RatiosPaddingFactory, + ) -> None: + """Postprocessed detections contain expected arrays.""" + batch_size = 2 + outputs = make_yolov10_output(batch_size, num_dets=5) + ratios, padding = make_ratios_padding(batch_size) + postprocessed = postprocess_yolov10(outputs, ratios, padding) + for result in postprocessed: + bboxes, scores, class_ids = result + assert isinstance(bboxes, np.ndarray) + assert isinstance(scores, np.ndarray) + assert isinstance(class_ids, np.ndarray) + assert bboxes.ndim == 2 + assert bboxes.shape[1] == 4 + assert len(scores) == len(bboxes) + assert len(class_ids) == len(bboxes) + + def test_classification_structure( + self, make_classification_output: ClassificationOutputFactory + ) -> None: + """Postprocessed classifications contain normalized probabilities.""" + batch_size = 2 + outputs = make_classification_output(batch_size) + postprocessed = postprocess_classifications(outputs) + for result in postprocessed: + assert len(result) >= 1 + probs = result[0] + assert isinstance(probs, np.ndarray) + assert np.isclose(np.sum(probs), 1.0, rtol=1e-5) diff --git a/tests/image/test_preproc.py b/tests/image/test_preproc.py new file mode 100644 index 00000000..57ead052 --- /dev/null +++ b/tests/image/test_preproc.py @@ -0,0 +1,363 @@ +# Copyright (c) 2025-2026 Justin Davis (davisjustin302@gmail.com) +# +# MIT License +# mypy: disable-error-code="misc" +from __future__ import annotations + +import time +from pathlib import Path +from typing import TYPE_CHECKING + +import numpy as np +import pytest + +from trtutils.image.preprocessors import ( + CPUPreprocessor, + CUDAPreprocessor, + TRTPreprocessor, +) + +from .conftest import ( + CUDA_MAG_BOUNDS, + IMAGENET_MEAN, + IMAGENET_STD, + PREPROC_DTYPE, + PREPROC_RANGE, + PREPROC_SIZE, +) + +if TYPE_CHECKING: + from collections.abc import Callable + +_DATA_DIR = Path(__file__).parent.parent.parent / "data" +_HORSE_IMAGE_PATH = _DATA_DIR / "horse.jpg" +_PEOPLE_IMAGE_PATH = _DATA_DIR / "people.jpeg" +_IMAGE_PATHS = [_HORSE_IMAGE_PATH, _PEOPLE_IMAGE_PATH] + + +def _read_image(path: Path) -> np.ndarray: + import cv2 + + img = cv2.imread(str(path)) + if img is None: + pytest.skip(f"Test image not found: {path}") + return img + + +class TestPreprocessorLoads: + """Ensure preprocessors initialize correctly.""" + + @pytest.mark.parametrize("ptype", ["cpu", "cuda", "trt"]) + def test_load_without_normalization( + self, + make_preprocessor: Callable[..., CPUPreprocessor | CUDAPreprocessor | TRTPreprocessor], + ptype: str, + ) -> None: + """Preprocessors load with default settings (no mean/std).""" + preproc = make_preprocessor(ptype) + assert preproc + + @pytest.mark.parametrize("ptype", ["cpu", "cuda", "trt"]) + def test_load_with_imagenet_normalization( + self, + make_preprocessor: Callable[..., CPUPreprocessor | CUDAPreprocessor | TRTPreprocessor], + ptype: str, + ) -> None: + """Preprocessors load with mean/std normalization.""" + preproc = make_preprocessor(ptype, mean=IMAGENET_MEAN, std=IMAGENET_STD) + assert preproc + + +class TestPreprocessorDeterminism: + """Verify deterministic preprocessing behavior.""" + + @pytest.mark.parametrize("ptype", ["cpu", "cuda", "trt"]) + def test_same_input_same_output( + self, + make_preprocessor: Callable[..., CPUPreprocessor | CUDAPreprocessor | TRTPreprocessor], + ptype: str, + images, + ) -> None: + """Preprocessing same image yields identical results.""" + horse_image = images["horse"].array + preproc = make_preprocessor(ptype) + result1 = preproc.preprocess([horse_image])[0] + result2 = preproc.preprocess([horse_image])[0] + assert np.array_equal(result1, result2) + + @pytest.mark.parametrize("ptype", ["cpu", "cuda", "trt"]) + def test_same_input_same_output_imagenet( + self, + make_preprocessor: Callable[..., CPUPreprocessor | CUDAPreprocessor | TRTPreprocessor], + ptype: str, + images, + ) -> None: + """Preprocessing with mean/std yields identical results.""" + horse_image = images["horse"].array + preproc = make_preprocessor(ptype, mean=IMAGENET_MEAN, std=IMAGENET_STD) + result1 = preproc.preprocess([horse_image])[0] + result2 = preproc.preprocess([horse_image])[0] + assert np.array_equal(result1, result2) + + +class TestPreprocessorParity: + """Check CPU/GPU preprocessor parity.""" + + def _assess_parity( + self, + preproc1: CPUPreprocessor | CUDAPreprocessor | TRTPreprocessor, + tag1: str, + preproc2: CPUPreprocessor | CUDAPreprocessor | TRTPreprocessor, + tag2: str, + method: str, + ) -> None: + """Assert preprocessing outputs match across backends.""" + for img_path in _IMAGE_PATHS: + img = _read_image(img_path) + result1, ratios1_list, padding1_list = preproc1.preprocess([img], resize=method) + result2, ratios2_list, padding2_list = preproc2.preprocess([img], resize=method) + ratios1, ratios2 = ratios1_list[0], ratios2_list[0] + padding1, padding2 = padding1_list[0], padding2_list[0] + assert ratios1 == ratios2 + assert padding1 == padding2 + assert result1.shape == result2.shape, ( + f"{tag1}: {result1.shape} != {tag2}: {result2.shape}" + ) + assert result1.dtype == result2.dtype, ( + f"{tag1}: {result1.dtype} != {tag2}: {result2.dtype}" + ) + cpu_mean = np.mean(result1) + other_mean = np.mean(result2) + assert cpu_mean - CUDA_MAG_BOUNDS <= other_mean <= cpu_mean + CUDA_MAG_BOUNDS, ( + f"{tag1}: {cpu_mean} != {tag2}: {other_mean}" + ) + diff_mask = np.any(result1 != result2, axis=-1) + avg_diff = np.mean(np.abs(result1[diff_mask] - result2[diff_mask])) + assert avg_diff < 1.0, f"{tag1} != {tag2}: {avg_diff}" + + @pytest.mark.parametrize("ptype", ["cuda", "trt"]) + @pytest.mark.parametrize("method", ["linear", "letterbox"]) + def test_gpu_matches_cpu( + self, + make_preprocessor: Callable[..., CPUPreprocessor | CUDAPreprocessor | TRTPreprocessor], + ptype: str, + method: str, + ) -> None: + """GPU preprocessing matches CPU preprocessing.""" + cpu = make_preprocessor("cpu") + other = make_preprocessor(ptype) + self._assess_parity(cpu, "CPU", other, ptype.upper(), method) + + @pytest.mark.parametrize("ptype", ["cuda", "trt"]) + @pytest.mark.parametrize("method", ["linear", "letterbox"]) + def test_gpu_matches_cpu_imagenet( + self, + make_preprocessor: Callable[..., CPUPreprocessor | CUDAPreprocessor | TRTPreprocessor], + ptype: str, + method: str, + ) -> None: + """GPU preprocessing matches CPU with ImageNet mean/std.""" + cpu = make_preprocessor("cpu", mean=IMAGENET_MEAN, std=IMAGENET_STD) + other = make_preprocessor(ptype, mean=IMAGENET_MEAN, std=IMAGENET_STD) + self._assess_parity(cpu, "CPU", other, ptype.upper(), method) + + +class TestPreprocessorAPI: + """Validate preprocessor API behavior.""" + + def test_accepts_list_input(self, random_images: Callable[..., list[np.ndarray]]) -> None: + """Preprocessor accepts list input and returns expected types.""" + preproc = CPUPreprocessor(PREPROC_SIZE, PREPROC_RANGE, PREPROC_DTYPE) + images = random_images(3) + result, ratios, padding = preproc.preprocess(images) + assert isinstance(result, np.ndarray) + assert isinstance(ratios, list) + assert isinstance(padding, list) + + def test_output_shape_single(self, random_images: Callable[..., list[np.ndarray]]) -> None: + """Preprocessor returns correct shape for single image.""" + preproc = CPUPreprocessor(PREPROC_SIZE, PREPROC_RANGE, PREPROC_DTYPE) + images = random_images(1) + result, ratios, padding = preproc.preprocess(images) + assert result.shape == (1, 3, 640, 640) + assert len(ratios) == 1 + assert len(padding) == 1 + assert len(ratios[0]) == 2 + assert len(padding[0]) == 2 + + @pytest.mark.parametrize("batch_size", [2, 4]) + def test_output_shape_batch( + self, random_images: Callable[..., list[np.ndarray]], batch_size: int + ) -> None: + """Preprocessor returns correct shape for batch input.""" + preproc = CPUPreprocessor(PREPROC_SIZE, PREPROC_RANGE, PREPROC_DTYPE) + images = random_images(batch_size) + result, ratios, padding = preproc.preprocess(images) + assert result.shape == (batch_size, 3, 640, 640) + assert len(ratios) == batch_size + assert len(padding) == batch_size + + def test_output_dtype(self, random_images: Callable[..., list[np.ndarray]]) -> None: + """Preprocessor outputs float32 arrays.""" + preproc = CPUPreprocessor(PREPROC_SIZE, PREPROC_RANGE, PREPROC_DTYPE) + images = random_images(2) + result, _, _ = preproc.preprocess(images) + assert result.dtype == np.float32 + + def test_output_range(self, random_images: Callable[..., list[np.ndarray]]) -> None: + """Preprocessor outputs normalized values in expected range.""" + preproc = CPUPreprocessor(PREPROC_SIZE, PREPROC_RANGE, PREPROC_DTYPE) + images = random_images(2) + result, _, _ = preproc.preprocess(images) + assert result.min() >= 0.0 + assert result.max() <= 1.0 + + def test_ratio_padding_types(self, random_images: Callable[..., list[np.ndarray]]) -> None: + """Preprocessor returns ratios and padding as list of tuples.""" + preproc = CPUPreprocessor(PREPROC_SIZE, PREPROC_RANGE, PREPROC_DTYPE) + images = random_images(2) + _, ratios, padding = preproc.preprocess(images) + for ratio in ratios: + assert isinstance(ratio, tuple) + assert len(ratio) == 2 + assert all(isinstance(v, float) for v in ratio) + for pad in padding: + assert isinstance(pad, tuple) + assert len(pad) == 2 + assert all(isinstance(v, float) for v in pad) + + def test_batch_matches_individual(self, random_images: Callable[..., list[np.ndarray]]) -> None: + """Batch preprocessing matches individual image preprocessing.""" + preproc = CPUPreprocessor(PREPROC_SIZE, PREPROC_RANGE, PREPROC_DTYPE) + rng = np.random.default_rng(42) + images = random_images(3) + images = [rng.integers(0, 255, img.shape, dtype=np.uint8) for img in images] + batch_result, batch_ratios, batch_padding = preproc.preprocess(images) + for i, img in enumerate(images): + single_result, single_ratios, single_padding = preproc.preprocess([img]) + np.testing.assert_array_equal(batch_result[i], single_result[0]) + assert batch_ratios[i] == single_ratios[0] + assert batch_padding[i] == single_padding[0] + + +class TestBatchProcessing: + """Validate batch preprocessing behavior.""" + + @pytest.mark.parametrize("ptype", ["cpu", "cuda", "trt"]) + def test_batch_output_shape( + self, + make_preprocessor: Callable[..., CPUPreprocessor | CUDAPreprocessor | TRTPreprocessor], + ptype: str, + test_images: list[np.ndarray], + ) -> None: + """Batch preprocessing preserves shapes and metadata.""" + preproc = make_preprocessor(ptype) + images = test_images[:3] if len(test_images) >= 3 else test_images + result, ratios_list, padding_list = preproc.preprocess(images) + assert result.shape[0] == len(images) + assert result.shape == (len(images), 3, 640, 640) + assert len(ratios_list) == len(images) + assert len(padding_list) == len(images) + + @pytest.mark.parametrize("ptype", ["cuda", "trt"]) + def test_batch_parity_with_single( + self, + make_preprocessor: Callable[..., CPUPreprocessor | CUDAPreprocessor | TRTPreprocessor], + ptype: str, + test_images: list[np.ndarray], + ) -> None: + """Batch preprocessing matches single-image results.""" + preproc = make_preprocessor(ptype) + images = test_images[:3] if len(test_images) >= 3 else test_images + batch_result, batch_ratios, batch_padding = preproc.preprocess(images) + for i, img in enumerate(images): + single_result, single_ratios, single_padding = preproc.preprocess([img]) + assert np.allclose(batch_result[i], single_result[0], rtol=1e-5, atol=1e-5) + assert batch_ratios[i] == single_ratios[0] + assert batch_padding[i] == single_padding[0] + + def test_cuda_dynamic_reallocation(self, images) -> None: + """CUDA preprocessor reallocates for varying batch sizes.""" + horse_image = images["horse"].array + preproc = CUDAPreprocessor(PREPROC_SIZE, PREPROC_RANGE, PREPROC_DTYPE) + result1, _, _ = preproc.preprocess([horse_image]) + assert result1.shape[0] == 1 + result3, _, _ = preproc.preprocess([horse_image, horse_image, horse_image]) + assert result3.shape[0] == 3 + result2, _, _ = preproc.preprocess([horse_image, horse_image]) + assert result2.shape[0] == 2 + assert np.allclose(result1[0], result3[0], rtol=1e-5, atol=1e-5) + assert np.allclose(result1[0], result2[0], rtol=1e-5, atol=1e-5) + + +@pytest.mark.performance +class TestPerformance: + """Benchmark preprocessing performance.""" + + def _measure( + self, + images: list[np.ndarray], + preproc: CPUPreprocessor | CUDAPreprocessor | TRTPreprocessor, + ) -> float: + """Measure average preprocessing time over 10 iterations.""" + profs = [] + for _ in range(10): + t0 = time.perf_counter() + preproc.preprocess(images) + t1 = time.perf_counter() + profs.append(t1 - t0) + return float(np.mean(profs)) + + def _run_perf_test(self, gpu_preproc: CUDAPreprocessor | TRTPreprocessor) -> tuple[float, float]: + """Run CPU vs GPU preprocessing timing test.""" + cpu = CPUPreprocessor(PREPROC_SIZE, PREPROC_RANGE, PREPROC_DTYPE) + img = _read_image(_HORSE_IMAGE_PATH) + images = [img] + for _ in range(10): + cpu.preprocess(images) + gpu_preproc.preprocess(images) + cpu_time = self._measure(images, cpu) + gpu_time = self._measure(images, gpu_preproc) + if cpu_time <= gpu_time: + pytest.skip("GPU overhead dominated for small input - expected in containers") + return cpu_time, gpu_time + + def test_gpu_faster_than_cpu_cuda(self) -> None: + """CUDA preprocessing is faster than CPU.""" + cuda = CUDAPreprocessor(PREPROC_SIZE, PREPROC_RANGE, PREPROC_DTYPE, pagelocked_mem=False) + cpu_time, cuda_time = self._run_perf_test(cuda) + print(f"CPU: {cpu_time:.3f}s, CUDA: {cuda_time:.3f}s, speedup: {cpu_time / cuda_time:.2f}x") + + def test_gpu_pagelocked_faster_cuda(self) -> None: + """CUDA preprocessing speedup with pagelocked memory.""" + cuda = CUDAPreprocessor(PREPROC_SIZE, PREPROC_RANGE, PREPROC_DTYPE, pagelocked_mem=True) + cpu_time, cuda_time = self._run_perf_test(cuda) + print( + f"Pagelocked - CPU: {cpu_time:.3f}s, CUDA: {cuda_time:.3f}s," + f" speedup: {cpu_time / cuda_time:.2f}x" + ) + + def test_gpu_faster_than_cpu_trt(self) -> None: + """TRT preprocessing is faster than CPU.""" + try: + trt = TRTPreprocessor(PREPROC_SIZE, PREPROC_RANGE, PREPROC_DTYPE, pagelocked_mem=False) + except RuntimeError as e: + if "Failed to build engine" in str(e): + pytest.skip(f"TRT cannot build for this GPU: {e}") + raise + cpu_time, trt_time = self._run_perf_test(trt) + print(f"CPU: {cpu_time:.3f}s, TRT: {trt_time:.3f}s, speedup: {cpu_time / trt_time:.2f}x") + + def test_gpu_pagelocked_faster_trt(self) -> None: + """TRT preprocessing speedup with pagelocked memory.""" + try: + trt = TRTPreprocessor(PREPROC_SIZE, PREPROC_RANGE, PREPROC_DTYPE, pagelocked_mem=True) + except RuntimeError as e: + if "Failed to build engine" in str(e): + pytest.skip(f"TRT cannot build for this GPU: {e}") + raise + cpu_time, trt_time = self._run_perf_test(trt) + print( + f"Pagelocked - CPU: {cpu_time:.3f}s, TRT: {trt_time:.3f}s," + f" speedup: {cpu_time / trt_time:.2f}x" + ) diff --git a/tests/image/test_sahi.py b/tests/image/test_sahi.py new file mode 100644 index 00000000..69927729 --- /dev/null +++ b/tests/image/test_sahi.py @@ -0,0 +1,84 @@ +# Copyright (c) 2026 Justin Davis (davisjustin302@gmail.com) +# +# MIT License +# mypy: disable-error-code="misc" +"""Tests for SAHI (Slicing Aided Hyper Inference) integration.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +BASE_DIR = Path(__file__).parent.parent.parent +DATA_DIR = BASE_DIR / "data" +YOLOV10_ONNX = DATA_DIR / "yolov10" / "yolov10n_640.onnx" + + +def _sahi_available() -> bool: + """Check if the trtutils SAHI module is importable.""" + try: + from trtutils.image.sahi import SAHI # noqa: F401 + + return True + except ImportError: + return False + + +@pytest.fixture(scope="module") +def yolov10_engine(build_test_engine) -> Path: + """Build and cache a YOLOv10n engine for the test module.""" + if not YOLOV10_ONNX.exists(): + pytest.skip("yolov10n_640.onnx not available") + return build_test_engine(YOLOV10_ONNX) + + +class TestSAHIIntegration: + """Test SAHI slicing and merging integration.""" + + def test_sahi_available(self) -> None: + """SAHI module can be imported from trtutils.image.sahi.""" + if not _sahi_available(): + pytest.skip("SAHI module not available") + from trtutils.image.sahi import SAHI + + assert SAHI is not None + + def test_sahi_init_with_detector(self, yolov10_engine: Path) -> None: + """SAHI can be initialized with a Detector.""" + if not _sahi_available(): + pytest.skip("SAHI module not available") + from trtutils.image.sahi import SAHI + from trtutils.models import YOLOv10 + + det = YOLOv10(yolov10_engine, warmup=False) + sahi = SAHI(det) + assert sahi is not None + + def test_sahi_end2end(self, yolov10_engine: Path, images) -> None: + """SAHI end2end runs without error and returns detections.""" + horse_image = images["horse"].array + if not _sahi_available(): + pytest.skip("SAHI module not available") + from trtutils.image.sahi import SAHI + from trtutils.models import YOLOv10 + + det = YOLOv10(yolov10_engine, warmup=False) + sahi = SAHI(det) + detections = sahi.end2end(horse_image) + assert isinstance(detections, list) + for d in detections: + assert len(d) == 3 # (bbox, score, class_id) + + def test_sahi_with_slice_size(self, yolov10_engine: Path, images) -> None: + """SAHI accepts custom slice_size parameter.""" + horse_image = images["horse"].array + if not _sahi_available(): + pytest.skip("SAHI module not available") + from trtutils.image.sahi import SAHI + from trtutils.models import YOLOv10 + + det = YOLOv10(yolov10_engine, warmup=False) + sahi = SAHI(det, slice_size=(320, 320)) + detections = sahi.end2end(horse_image) + assert isinstance(detections, list)