From 7947b09bd94d76dd3f29a09fd0b96b339188e6a7 Mon Sep 17 00:00:00 2001 From: mykola Date: Tue, 18 Nov 2025 09:33:11 +0300 Subject: [PATCH 1/3] feat: Enable support GPU in YoloOnnxDetector.py --- scaledp/models/detectors/yolo/yolo.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/scaledp/models/detectors/yolo/yolo.py b/scaledp/models/detectors/yolo/yolo.py index fc14b73..23c52cf 100644 --- a/scaledp/models/detectors/yolo/yolo.py +++ b/scaledp/models/detectors/yolo/yolo.py @@ -5,12 +5,13 @@ import numpy as np import onnxruntime +from scaledp.enums import Device from scaledp.models.detectors.yolo.utils import multiclass_nms, xywh2xyxy class YOLO: - def __init__(self, path, conf_thres=0.7, iou_thres=0.5) -> None: + def __init__(self, path, device=Device.CPU, conf_thres=0.7, iou_thres=0.5) -> None: self.conf_threshold = conf_thres self.iou_threshold = iou_thres @@ -22,15 +23,24 @@ def __init__(self, path, conf_thres=0.7, iou_thres=0.5) -> None: self.pad_y = None # Initialize model - self.initialize_model(path) + self.initialize_model(path, device) def __call__(self, image) -> Any: return self.detect_objects(image) - def initialize_model(self, path): - self.session = onnxruntime.InferenceSession( - path, providers=onnxruntime.get_available_providers() + def initialize_model(self, path, device): + provider = ( + "CUDAExecutionProvider" if device == Device.CUDA else "CPUExecutionProvider" ) + + if provider in onnxruntime.get_available_providers(): + providers = [provider] + else: + logging.warning( + f"{provider} is not available. Falling back to CPUExecutionProvider." + ) + providers = ["CPUExecutionProvider"] + self.session = onnxruntime.InferenceSession(path, providers=providers) # Get model info self.get_input_details() self.get_output_details() From a7dd2f447dadb786f61f807c7e8e685940dd5ee8 Mon Sep 17 00:00:00 2001 From: mykola Date: Tue, 18 Nov 2025 09:36:14 +0300 Subject: [PATCH 2/3] update: Change default values for detectors --- cliff.toml | 1 + scaledp/models/detectors/FaceDetector.py | 4 ++-- scaledp/models/detectors/SignatureDetector.py | 4 ++-- scaledp/models/detectors/YoloOnnxDetector.py | 10 +++++++--- 4 files changed, 12 insertions(+), 7 deletions(-) diff --git a/cliff.toml b/cliff.toml index 2c172e9..1c12bdb 100644 --- a/cliff.toml +++ b/cliff.toml @@ -61,6 +61,7 @@ protect_breaking_commits = false commit_parsers = [ { message = "^feat", group = "🚀 Features" }, { message = "^fix", group = "🐛 Bug Fixes" }, + { message = "^update", group = "🔄 Updates" }, { message = "^doc", group = "📚 Documentation" }, { message = "^perf", group = "⚡ Performance" }, { message = "^maint", group = "🧰 Maintenance" }, diff --git a/scaledp/models/detectors/FaceDetector.py b/scaledp/models/detectors/FaceDetector.py index 85909a0..dddbdec 100644 --- a/scaledp/models/detectors/FaceDetector.py +++ b/scaledp/models/detectors/FaceDetector.py @@ -18,8 +18,8 @@ class FaceDetector(YoloOnnxDetector): "batchSize": 2, "partitionMap": False, "numPartitions": 0, - "pageCol": "page", - "pathCol": "path", + "pageCol": "", + "pathCol": "", "propagateError": False, "task": "detect", "onlyRotated": False, diff --git a/scaledp/models/detectors/SignatureDetector.py b/scaledp/models/detectors/SignatureDetector.py index 4e5bf98..6449aa4 100644 --- a/scaledp/models/detectors/SignatureDetector.py +++ b/scaledp/models/detectors/SignatureDetector.py @@ -16,8 +16,8 @@ class SignatureDetector(YoloOnnxDetector): "batchSize": 2, "partitionMap": False, "numPartitions": 0, - "pageCol": "page", - "pathCol": "path", + "pageCol": "", + "pathCol": "", "propagateError": False, "task": "detect", "onlyRotated": False, diff --git a/scaledp/models/detectors/YoloOnnxDetector.py b/scaledp/models/detectors/YoloOnnxDetector.py index 44d94b6..456018f 100644 --- a/scaledp/models/detectors/YoloOnnxDetector.py +++ b/scaledp/models/detectors/YoloOnnxDetector.py @@ -48,8 +48,8 @@ class YoloOnnxDetector(BaseDetector, HasDevice, HasBatchSize, HasLabels): "batchSize": 2, "partitionMap": False, "numPartitions": 0, - "pageCol": "page", - "pathCol": "path", + "pageCol": "", + "pathCol": "", "propagateError": False, "task": "detect", "onlyRotated": False, @@ -83,7 +83,11 @@ def get_model(cls, params): logging.info("Model downloaded") - detector = YOLO(model_path_final, conf_thres=params["scoreThreshold"]) + detector = YOLO( + model_path_final, + conf_thres=params["scoreThreshold"], + device=params["device"], + ) cls._model[model_path] = detector return cls._model[model_path] From ad4d63d9f5c6aeeebe7741783ef79ed59a4857a7 Mon Sep 17 00:00:00 2001 From: mykola Date: Tue, 18 Nov 2025 14:43:15 +0300 Subject: [PATCH 3/3] feat: Added BaseTextSplitter and TextSplitter for semantic splitting text --- docs/source/index.rst | 2 + docs/source/models/splitters/base_splitter.md | 150 +++++++++ .../models/splitters/base_text_splitter.md | 89 ++++++ docs/source/models/splitters/index.md | 187 +++++++++++ docs/source/models/splitters/text_splitter.md | 207 +++++++++++++ docs/source/schemas.md | 96 ++++++ docs/source/schemas/box.md | 234 ++++++++++++++ docs/source/schemas/document.md | 95 ++++++ docs/source/schemas/index.md | 290 ++++++++++++++++++ docs/source/schemas/text_chunks.md | 122 ++++++++ docs/source/splitters.md | 51 +++ poetry.lock | 26 +- pyproject.toml | 1 + scaledp/models/splitters/BaseSplitter.py | 31 ++ scaledp/models/splitters/BaseTextSplitter.py | 177 +++++++++++ scaledp/models/splitters/TextSplitter.py | 49 +++ scaledp/models/splitters/__init__.py | 5 + scaledp/params.py | 50 +++ scaledp/schemas/TextChunks.py | 19 ++ tests/conftest.py | 22 ++ tests/models/splitters/__init__.py | 0 tests/models/splitters/test_text_splitter.py | 148 +++++++++ 22 files changed, 2050 insertions(+), 1 deletion(-) create mode 100644 docs/source/models/splitters/base_splitter.md create mode 100644 docs/source/models/splitters/base_text_splitter.md create mode 100644 docs/source/models/splitters/index.md create mode 100644 docs/source/models/splitters/text_splitter.md create mode 100644 docs/source/schemas.md create mode 100644 docs/source/schemas/box.md create mode 100644 docs/source/schemas/document.md create mode 100644 docs/source/schemas/index.md create mode 100644 docs/source/schemas/text_chunks.md create mode 100644 docs/source/splitters.md create mode 100644 scaledp/models/splitters/BaseSplitter.py create mode 100644 scaledp/models/splitters/BaseTextSplitter.py create mode 100644 scaledp/models/splitters/TextSplitter.py create mode 100644 scaledp/models/splitters/__init__.py create mode 100644 scaledp/schemas/TextChunks.py create mode 100644 tests/models/splitters/__init__.py create mode 100644 tests/models/splitters/test_text_splitter.py diff --git a/docs/source/index.rst b/docs/source/index.rst index df628ed..75a28cf 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -47,6 +47,8 @@ Benefits of using ScaleDP detectors.md ocr.md embeddings.md + splitters.md + schemas.md show_utils.md release_notes.md diff --git a/docs/source/models/splitters/base_splitter.md b/docs/source/models/splitters/base_splitter.md new file mode 100644 index 0000000..41befb9 --- /dev/null +++ b/docs/source/models/splitters/base_splitter.md @@ -0,0 +1,150 @@ +(BaseSplitter)= +# BaseSplitter + +## Overview + +`BaseSplitter` is the abstract base class for all text splitter transformers in the ScaleDP library. It extends PySpark's `Transformer` class and provides common functionality for splitting documents into chunks. This class defines the interface and shared parameters for all splitter implementations. + +## Inheritance + +- Extends PySpark's `Transformer` for ML pipeline compatibility. +- Mixes in the following parameter mixins: + - `HasInputCol` - Input column containing documents + - `HasOutputCol` - Output column for results + - `HasKeepInputData` - Whether to preserve input data + - `HasChunkSize` - Maximum chunk size + - `HasChunkOverlap` - Overlap between chunks + - `HasNumPartitions` - Partition control + - `HasPartitionMap` - Enable distributed processing mode + - `HasWhiteList` - Whitelist filtering support + +## Key Features + +- **PySpark Integration**: Full compatibility with PySpark ML pipelines +- **Serialization**: Support for reading and writing model parameters +- **Flexible Configuration**: Extensive parameters for customization +- **Extensible Design**: Foundation for specialized splitter implementations +- **Batch Processing**: Support for both local and distributed processing modes + +## Class Hierarchy + +``` +BaseSplitter +├── BaseTextSplitter +│ └── TextSplitter (concrete implementation) +``` + +## Parameters + +| Parameter | Type | Description | Default | +|-------------------|---------|--------------------------------------------------|-----------------------------| +| inputCol | str | Input column name | varies by implementation | +| outputCol | str | Output column name | varies by implementation | +| keepInputData | bool | Keep input columns in output | True | +| chunkSize | int | Size of each chunk | 500 | +| chunkOverlap | int | Overlap between consecutive chunks | 0 | +| numPartitions | int | Number of partitions | 1 | +| partitionMap | bool | Use partitioned mapping (pandas_udf mode) | False | +| whiteList | list | Whitelist of allowed items | [] | + +## Abstract Methods + +Subclasses must implement the following abstract methods: + +### transform(dataset) +Transforms a Spark DataFrame by applying the splitter logic. + +**Parameters:** +- `dataset` (pyspark.sql.DataFrame): Input DataFrame + +**Returns:** +- (pyspark.sql.DataFrame): DataFrame with split results + +## Usage Guidelines + +`BaseSplitter` is an abstract class and should not be instantiated directly. Instead, use concrete implementations like: + +- [`TextSplitter`](./text_splitter.md) - Semantic text splitting + +```python +# Correct: Use concrete implementation +from scaledp.models.splitters.TextSplitter import TextSplitter + +splitter = TextSplitter(chunk_size=500, chunk_overlap=50) +``` + +```python +# Incorrect: Do not instantiate BaseSplitter directly +from scaledp.models.splitters.BaseSplitter import BaseSplitter + +# This will raise an error +splitter = BaseSplitter() # Error! +``` + +## Creating Custom Splitters + +To create a custom splitter, inherit from `BaseSplitter` or `BaseTextSplitter`: + +```python +from scaledp.models.splitters.BaseTextSplitter import BaseTextSplitter +from scaledp.schemas.Document import Document +from scaledp.schemas.TextChunks import TextChunks + +class CustomSplitter(BaseTextSplitter): + """Custom splitter implementation.""" + + def split(self, document: Document) -> TextChunks: + """Implement custom splitting logic.""" + # Your splitting algorithm here + chunks = self._split_text(document.text) + return TextChunks( + path=document.path, + chunks=chunks, + exception="", + processing_time=0.0 + ) +``` + +## Pipeline Integration + +`BaseSplitter` and its subclasses are designed to work seamlessly with PySpark pipelines: + +```python +from pyspark.ml import Pipeline +from scaledp.models.splitters.TextSplitter import TextSplitter + +# Create pipeline stages +splitter = TextSplitter(chunk_size=500) + +# Create and fit pipeline +pipeline = Pipeline(stages=[splitter]) +model = pipeline.fit(training_data) + +# Transform data +results = model.transform(test_data) +``` + +## Serialization + +All splitters support PySpark's read/write functionality: + +```python +# Save a model +splitter = TextSplitter(chunk_size=500) +splitter.write().overwrite().save("path/to/splitter") + +# Load a model +loaded_splitter = TextSplitter.load("path/to/splitter") +``` + +## Related Classes + +- [`BaseTextSplitter`](./base_text_splitter.md) - Abstract base for text splitters +- [`TextSplitter`](./text_splitter.md) - Concrete semantic text splitter implementation +- [`Document`](#Document) - Input document schema +- [`TextChunks`](#TextChunks) - Output text chunks schema + +## See Also + +- [PySpark ML Transformers](https://spark.apache.org/docs/latest/ml-pipeline.html) +- [Transformer API](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.Transformer.html) diff --git a/docs/source/models/splitters/base_text_splitter.md b/docs/source/models/splitters/base_text_splitter.md new file mode 100644 index 0000000..1a9b810 --- /dev/null +++ b/docs/source/models/splitters/base_text_splitter.md @@ -0,0 +1,89 @@ +(BaseTextSplitter)= +# BaseTextSplitter + +## Overview + +`BaseTextSplitter` is an abstract base class for text splitting transformers in PySpark. It provides common functionality for splitting documents into chunks while preserving metadata like file paths and document types. It is designed for extensibility and serves as the foundation for concrete text splitting implementations like [`TextSplitter`](./text_splitter.md). + +The splitter operates on **Document struct columns**, which contain structured data including text content, file path, document type, and bounding boxes. + +## Inheritance + +- Inherits from [`BaseSplitter`](./base_splitter.md), which provides core Spark ML transformer functionality and schema handling. +- Mixes in `HasColumnValidator` and `HasDefaultEnum` for validation and enumeration support. +- Extends `DefaultParamsReadable` and `DefaultParamsWritable` for serialization support. + +## Key Features + +- **Document-Centric**: Works with Document struct columns containing path, text, type, and bboxes +- **Metadata Preservation**: Maintains document metadata (path, document type) through the splitting process +- **Flexible Chunking**: Configurable chunk size and overlap for text splitting +- **Distributed Processing**: Supports both regular UDF and pandas_udf (partitionMap) modes for Spark batch processing +- **Error Handling**: Captures and reports processing exceptions in output + +## Usage Example + +```python +from scaledp.models.splitters.TextSplitter import TextSplitter +from scaledp.schemas.Document import Document + +# Create a splitter with custom parameters +splitter = TextSplitter( + inputCol="document", # Column containing Document structs + outputCol="chunks", # Output column for TextChunks + chunk_size=500, # Characters per chunk + chunk_overlap=50, # Character overlap between chunks +) + +# Use in a Spark pipeline +result_df = splitter.transform(input_df) +``` + +## Parameters + +| Parameter | Type | Description | Default | +|-------------------|---------|--------------------------------------------------|-----------------------------| +| inputCol | str | Input Document struct column | "document" | +| outputCol | str | Output column for TextChunks results | "chunks" | +| keepInputData | bool | Keep input document column in output | True | +| chunk_size | int | Size of each chunk in characters | 500 | +| chunk_overlap | int | Number of characters to overlap between chunks | 0 | +| numPartitions | int | Number of partitions for coalescing | 1 | +| partitionMap | bool | Use pandas_udf for distributed processing | False | + +## Input Schema + +The input column must contain **Document struct**. For detailed schema information, see [Document Schema Documentation](../../schemas/document.md). + +**Key Fields:** +- `path` - File path or document identifier +- `text` - Text content to split +- `type` - Document type (e.g., "text", "pdf") +- `bboxes` - Bounding boxes (empty for text documents) +- `exception` - Error message if any (optional) + +## Output Schema + +The output column contains **TextChunks struct**. For detailed schema information, see [TextChunks Schema Documentation](../../schemas/text_chunks.md). + +**Key Fields:** +- `path` - Original document path +- `chunks` - List of text chunks +- `exception` - Error message if splitting failed +- `processing_time` - Time taken to split document (seconds) + +## Notes + +- The splitter is abstract and cannot be instantiated directly. Use concrete implementations like `TextSplitter`. +- Input documents must contain text and path information in the Document struct format. +- Chunk overlap can help maintain context between chunks for semantic meaning. +- The `partitionMap` option enables pandas_udf mode for better performance on large datasets but requires careful configuration. +- All errors during splitting are captured and reported in the `exception` field of the output. + +## Related Classes + +- [`TextSplitter`](./text_splitter.md) - Concrete implementation using semantic text splitting +- [`BaseSplitter`](./base_splitter.md) - Base transformer for all splitter implementations +- [`Document`](../../schemas/document.md) - Input schema class +- [`TextChunks`](../../schemas/text_chunks.md) - Output schema class +- [`Box`](../../schemas/box.md) - Bounding box schema class diff --git a/docs/source/models/splitters/index.md b/docs/source/models/splitters/index.md new file mode 100644 index 0000000..a422958 --- /dev/null +++ b/docs/source/models/splitters/index.md @@ -0,0 +1,187 @@ +(splitters)= +# Text Splitters + +Text splitters are transformers that divide documents into smaller, manageable chunks while preserving metadata and handling errors gracefully. They are essential for processing large documents in downstream tasks like embedding generation or information extraction. + +## Overview + +ScaleDP provides a flexible text splitting framework built on PySpark ML pipelines: + +- **Semantic Splitting**: Intelligently split text at natural boundaries +- **Distributed Processing**: Scale to large datasets with Spark +- **Metadata Preservation**: Keep document information through splitting +- **Error Handling**: Graceful error reporting and recovery +- **Pipeline Integration**: Seamless integration with PySpark ML pipelines + +## Available Splitters + +### [TextSplitter](./text_splitter.md) +Main text splitter implementation using semantic splitting. Intelligently divides text based on content boundaries while respecting chunk size and overlap constraints. + +**Key Features:** +- Semantic text splitting +- Configurable chunk size and overlap +- Support for distributed processing +- Metadata preservation + +## Base Classes + +### [BaseTextSplitter](./base_text_splitter.md) +Abstract base class for text splitters. Provides common functionality for operating on Document struct columns and producing TextChunks outputs. + +### [BaseSplitter](./base_splitter.md) +Foundation class for all splitter transformers. Extends PySpark's Transformer and defines the common interface for all splitter implementations. + +## Quick Start + +```python +from scaledp.models.splitters.TextSplitter import TextSplitter +from scaledp.schemas.Document import Document + +# Create a splitter +splitter = TextSplitter( + inputCol="document", # Input Document struct column + outputCol="chunks", # Output TextChunks column + chunk_size=500, # Max chunk size + chunk_overlap=50, # Character overlap +) + +# Use in a Spark pipeline +result_df = splitter.transform(input_df) + +# Access results +for row in result_df.collect(): + print(f"Path: {row.chunks.path}") + print(f"Chunks: {len(row.chunks.chunks)}") + print(f"Time: {row.chunks.processing_time}s") +``` + +## Input/Output Schema + +### Input: [Document Struct](../../schemas/document.md) + +| Field | Type | Description | +|------------|-------------------|-------------------------------| +| path | string | Document identifier/file path | +| text | string | Text content to split | +| type | string | Document type | +| bboxes | array | Bounding boxes (optional) | +| exception | string | Error message (optional) | + +For detailed information, see [Document Schema Documentation](../../schemas/document.md). + +### Output: [TextChunks Struct](../../schemas/text_chunks.md) + +| Field | Type | Description | +|------------------|---------------|--------------------------| +| path | string | Original document path | +| chunks | array | List of text chunks | +| exception | string | Error message if any | +| processing_time | double | Processing duration (sec)| + +For detailed information, see [TextChunks Schema Documentation](../../schemas/text_chunks.md). + +## Use Cases + +### Document Processing Pipeline +```python +from pyspark.ml import Pipeline + +pipeline = Pipeline(stages=[ + text_cleaner, # Clean input text + text_splitter, # Split into chunks + embedding_generator, # Generate embeddings +]) + +result = pipeline.fit(data).transform(data) +``` + +### Large Document Handling +```python +# Split large documents for processing +splitter = TextSplitter( + chunk_size=1000, # Larger chunks + chunk_overlap=200, # More context +) +``` + +### Batch Processing +```python +# Process multiple documents +result_df = splitter.transform(multi_doc_df) + +# Filter errors +valid_results = result_df.filter("chunks.exception == ''") +``` + +## Performance Tuning + +### Chunk Size Selection +- **Smaller chunks** (100-300): Better for semantic search, more processing overhead +- **Medium chunks** (500-1000): Good balance for most use cases +- **Large chunks** (2000+): Faster processing, less granular results + +### Overlap Configuration +- **No overlap** (0): Fastest processing, potential context loss +- **Small overlap** (10-50): Balanced approach for most cases +- **Large overlap** (200+): Maximum context preservation + +### Distributed Processing +```python +# Use pandas_udf mode for large datasets +splitter = TextSplitter( + partitionMap=True, # Enable pandas_udf mode + numPartitions=100, # Set target partitions +) +``` + +## Extending Splitters + +Create custom splitters by inheriting from `BaseTextSplitter`: + +```python +from scaledp.models.splitters.BaseTextSplitter import BaseTextSplitter +from scaledp.schemas.Document import Document +from scaledp.schemas.TextChunks import TextChunks + +class MyCustomSplitter(BaseTextSplitter): + def split(self, document: Document) -> TextChunks: + # Implement custom splitting logic + chunks = my_splitting_algorithm(document.text) + return TextChunks( + path=document.path, + chunks=chunks, + exception="", + processing_time=0.0 + ) +``` + +## Error Handling + +All splitters handle errors gracefully: + +```python +result_df = splitter.transform(df) + +# Check for errors +error_df = result_df.filter("chunks.exception != ''") +print(f"Failed: {error_df.count()}") + +# Process successful results +success_df = result_df.filter("chunks.exception == ''") +``` + +## Documentation Index + +- [BaseSplitter](./base_splitter.md) - Foundation class +- [BaseTextSplitter](./base_text_splitter.md) - Text splitting base class +- [TextSplitter](./text_splitter.md) - Semantic text splitter implementation + +## Related Resources + +- [Document Schema](../../schemas/document.md) - Input document structure +- [TextChunks Schema](../../schemas/text_chunks.md) - Output chunks structure +- [Box Schema](../../schemas/box.md) - Bounding box structure +- [All Data Schemas](../../schemas/index.md) - Complete schema documentation +- [PySpark ML Pipelines](https://spark.apache.org/docs/latest/ml-pipeline.html) +- [Transformer API](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.Transformer.html) diff --git a/docs/source/models/splitters/text_splitter.md b/docs/source/models/splitters/text_splitter.md new file mode 100644 index 0000000..73751ff --- /dev/null +++ b/docs/source/models/splitters/text_splitter.md @@ -0,0 +1,207 @@ +(TextSplitter)= +# TextSplitter + +## Overview + +`TextSplitter` is a concrete implementation of [`BaseTextSplitter`](./base_text_splitter.md) that uses semantic text splitting to divide documents into meaningful chunks. It splits text based on content boundaries while respecting chunk size and overlap constraints. The splitter operates on **Document struct columns** and preserves document metadata through the splitting process. + +## Inheritance + +- Inherits from [`BaseTextSplitter`](./base_text_splitter.md), which provides the core text splitting framework. +- Uses `semantic_text_splitter` library for intelligent chunking based on content semantics. + +## Key Features + +- **Semantic Splitting**: Intelligently splits text at natural content boundaries (sentences, paragraphs) +- **Configurable Chunk Size**: Control the maximum size of each chunk +- **Overlap Support**: Maintain context between chunks with configurable overlap +- **Metadata Preservation**: Keeps document path and type information throughout splitting +- **Batch Processing**: Supports distributed processing across Spark cluster +- **Error Handling**: Gracefully handles malformed or problematic text + +## Usage Example + +```python +from scaledp.models.splitters.TextSplitter import TextSplitter +from pyspark.ml import PipelineModel + +# Create a text splitter +text_splitter = TextSplitter( + inputCol="document", # Input Document struct column + outputCol="chunks", # Output TextChunks column + chunk_size=500, # Max chunk size in characters + chunk_overlap=50, # Character overlap between chunks + keepInputData=True, # Keep original document in output +) + +# Use in a Spark pipeline +pipeline = PipelineModel(stages=[text_splitter]) +result_df = pipeline.transform(input_df) + +# Inspect results +for row in result_df.collect(): + print(f"Document: {row.chunks.path}") + print(f"Number of chunks: {len(row.chunks.chunks)}") + print(f"Processing time: {row.chunks.processing_time}s") + if row.chunks.exception: + print(f"Error: {row.chunks.exception}") +``` + +## Parameters + +All parameters from [`BaseTextSplitter`](./base_text_splitter.md) are inherited. No additional parameters specific to `TextSplitter`. + +| Parameter | Type | Description | Default | +|-------------------|---------|--------------------------------------------------|-----------------------------| +| inputCol | str | Input Document struct column | "document" | +| outputCol | str | Output column for TextChunks results | "chunks" | +| keepInputData | bool | Keep input document column in output | True | +| chunk_size | int | Size of each chunk in characters | 500 | +| chunk_overlap | int | Number of characters to overlap between chunks | 0 | +| numPartitions | int | Number of partitions for coalescing | 1 | +| partitionMap | bool | Use pandas_udf for distributed processing | False | + +## Input Schema + +Requires a DataFrame column containing **Document struct**. For detailed schema information and examples, see [Document Schema Documentation](../../schemas/document.md). + +```python +from scaledp.schemas.Document import Document + +# Document struct fields +Document( + path: str, # File path or identifier + text: str, # Text content to split + type: str, # Document type (e.g., "text", "pdf") + bboxes: list[Box], # Bounding boxes (empty for plain text) + exception: str = "" # Optional error message +) +``` + +## Output Schema + +Produces output column with **TextChunks struct**. For detailed schema information and examples, see [TextChunks Schema Documentation](../../schemas/text_chunks.md). + +```python +from scaledp.schemas.TextChunks import TextChunks + +# TextChunks struct fields +TextChunks( + path: str, # Original document path + chunks: list[str], # List of text chunks + exception: str = "", # Error message if any + processing_time: float = 0.0 # Splitting duration in seconds +) +``` + +## Examples + +### Example 1: Basic Text Splitting + +```python +from scaledp.schemas.Document import Document +from scaledp.models.splitters.TextSplitter import TextSplitter + +# Create a test document +long_text = "This is a long document. " * 100 +doc = Document( + path="test.txt", + text=long_text, + type="text", + bboxes=[] +) + +# Create splitter +splitter = TextSplitter(chunk_size=200, chunk_overlap=20) + +# Split the document +result = splitter.split(doc) +print(f"Created {len(result.chunks)} chunks") +print(f"Processing time: {result.processing_time}ms") +``` + +### Example 2: DataFrame Transformation + +```python +from pyspark.sql.functions import col, lit, struct +from scaledp.models.splitters.TextSplitter import TextSplitter +from scaledp.schemas.Document import Document + +# Create a DataFrame with Document structs +df = spark.createDataFrame([ + { + "document": { + "path": "file1.txt", + "text": "First document content...", + "type": "text", + "bboxes": [], + "exception": "" + } + } +]) + +# Apply text splitter +splitter = TextSplitter(chunk_size=500, chunk_overlap=50) +result_df = splitter.transform(df) + +# View results +result_df.select("document.path", "chunks.chunks").show() +``` + +### Example 3: Pipeline Integration + +```python +from pyspark.ml import Pipeline +from scaledp.models.splitters.TextSplitter import TextSplitter + +# Create a text splitter stage for a pipeline +text_splitter = TextSplitter( + inputCol="document", + outputCol="chunks", + chunk_size=500, + chunk_overlap=50, +) + +# Create and fit pipeline +pipeline = Pipeline(stages=[text_splitter]) +model = pipeline.fit(input_df) + +# Transform data +output_df = model.transform(input_df) +``` + +## Performance Considerations + +- **Chunk Size**: Larger chunks process faster but may be too large for downstream tasks +- **Chunk Overlap**: Increases output size but helps maintain context between chunks +- **Batch Mode (partitionMap=False)**: Suitable for small documents or single-threaded processing +- **Pandas Mode (partitionMap=True)**: Better for large-scale distributed processing +- **numPartitions**: For pandas mode, controls how many partitions to coalesce to + +## Error Handling + +The splitter handles various error cases gracefully: + +```python +# Handling errors in output +for row in result_df.collect(): + if row.chunks.exception: + print(f"Error processing {row.chunks.path}: {row.chunks.exception}") + else: + print(f"Successfully split into {len(row.chunks.chunks)} chunks") +``` + +## Notes + +- Semantic splitting provides better quality chunks than simple character-based splitting +- Document path is preserved in output for tracking and debugging +- Processing time is recorded for performance monitoring +- Empty documents are handled gracefully and return an empty chunk list + +## Related Classes + +- [`BaseTextSplitter`](./base_text_splitter.md) - Abstract base class for text splitters +- [`BaseSplitter`](./base_splitter.md) - Base transformer for all splitter implementations +- [`Document`](../../schemas/document.md) - Input schema for documents +- [`TextChunks`](../../schemas/text_chunks.md) - Output schema for text chunks +- [`Box`](../../schemas/box.md) - Bounding box schema diff --git a/docs/source/schemas.md b/docs/source/schemas.md new file mode 100644 index 0000000..395a319 --- /dev/null +++ b/docs/source/schemas.md @@ -0,0 +1,96 @@ +Data Schemas +============ + +## Overview + +This section provides comprehensive documentation on data schemas used throughout ScaleDP. These structured schemas ensure type safety, consistency, and clarity across all transformers and pipelines. They serve as the foundation for document processing workflows. + +ScaleDP uses dataclass-based schemas that seamlessly integrate with both Python and PySpark SQL for efficient distributed processing. + +## Core Schemas + +* [**Document**](./schemas/document.md) - Represents a document with text, metadata, and layout information +* [**TextChunks**](./schemas/text_chunks.md) - Represents the output of text splitting operations +* [**Box**](./schemas/box.md) - Represents bounding boxes for spatial information + +## Quick Reference + +| Schema | Purpose | Used By | +|------------|--------------------------------------|----------------------| +| Document | Input document representation | Text Splitters, NER | +| TextChunks| Split text chunks with metadata | Embeddings, Search | +| Box | Bounding box and spatial info | OCR, Layout Analysis | + +## Quick Start + +### Creating a Document + +```python +from scaledp.schemas.Document import Document + +doc = Document( + path="/path/to/file.txt", + text="Document content here...", + type="text", + bboxes=[] +) +``` + + +### Using Bounding Boxes + +```python +from scaledp.schemas.Box import Box + +box = Box( + text="Text content", + score=0.95, + x=10, y=20, + width=100, height=50, + angle=0.0 +) +``` + +## Processing Pipeline + +``` +Document (Input) + ↓ +Text Splitter + ↓ +TextChunks (Split Results) + ↓ +Embeddings / NER / Other Processing + ↓ +Results +``` + +## Schema Integration + +All schemas provide seamless integration with: + +- **Python** - Native dataclasses with type hints +- **PySpark SQL** - `.get_schema()` method for SQL schema +- **DataFrames** - Direct use in Spark DataFrames +- **Pipelines** - Compatible with PySpark ML pipelines + +## Best Practices + +1. **Always provide path information** - Use meaningful file paths for tracking +2. **Set appropriate document types** - Use specific types (pdf, text, image) +3. **Include bounding boxes** - When layout information is available +4. **Track processing errors** - Capture exceptions in schema fields + +For detailed information on each schema, see: + +* [**Document Schema Documentation**](./schemas/document.md) +* [**TextChunks Schema Documentation**](./schemas/text_chunks.md) +* [**Box Schema Documentation**](./schemas/box.md) +* [**Complete Schemas Overview**](./schemas/index.md) + +## Related Sections + +* [**Text Splitters**](./splitters.md) - Document splitting transformers +* [**Embeddings**](./embeddings.md) - Text embedding generation +* [**Image Processing**](./image_processing.md) - Image document processing +* [**PDF Processing**](./pdf_processing.md) - PDF document handling diff --git a/docs/source/schemas/box.md b/docs/source/schemas/box.md new file mode 100644 index 0000000..d594623 --- /dev/null +++ b/docs/source/schemas/box.md @@ -0,0 +1,234 @@ +(Box)= +# Box Schema + +## Overview + +`Box` is a structured data schema that represents a bounding box with positional and text information. It is used to store layout information such as text regions, detected objects, or other spatial elements within documents. Boxes are typically part of a Document schema when processing images or PDFs with layout awareness. + +## Schema Structure + +```python +from scaledp.schemas.Box import Box +from dataclasses import dataclass + +# Box dataclass definition +@dataclass(order=True) +class Box: + text: str # Text content of the box + score: float # Confidence score + x: int # X coordinate (top-left) + y: int # Y coordinate (top-left) + width: int # Box width in pixels + height: int # Box height in pixels + angle: float = 0.0 # Rotation angle in degrees (default 0) +``` + +## Fields + +| Field | Type | Default | Description | +|--------|-------|---------|------------------------------------------------| +| text | str | - | Text content or label of the box | +| score | float | - | Confidence score (typically 0.0 to 1.0) | +| x | int | - | X coordinate of top-left corner | +| y | int | - | Y coordinate of top-left corner | +| width | int | - | Width of the bounding box | +| height | int | - | Height of the bounding box | +| angle | float | 0.0 | Rotation angle in degrees (-180 to 180) | + +## Usage Examples + +### Creating Boxes + +```python +from scaledp.schemas.Box import Box + +# Simple axis-aligned box +box = Box( + text="Title", + score=0.95, + x=10, + y=20, + width=300, + height=50, + angle=0.0 +) + +# Rotated box +rotated_box = Box( + text="Rotated Text", + score=0.89, + x=100, + y=150, + width=200, + height=30, + angle=45.0 # 45 degrees rotation +) +``` + +### Creating from Bounding Box Coordinates + +```python +from scaledp.schemas.Box import Box + +# From [x1, y1, x2, y2] format +bbox = [10, 20, 110, 70] # x1, y1, x2, y2 + +box = Box.from_bbox( + box=bbox, + angle=0.0, + label="Text", + score=0.92 +) +# Result: Box(text="Text", score=0.92, x=10, y=20, width=100, height=50, angle=0.0) +``` + +### Creating from Polygon Points + +```python +from scaledp.schemas.Box import Box + +# From polygon coordinates (quadrilateral) +polygon_points = [ + (10, 20), # top-left + (310, 20), # top-right + (310, 70), # bottom-right + (10, 70) # bottom-left +] + +box = Box.from_polygon( + polygon_points=polygon_points, + text="Box Text", + score=0.95, + padding=0 +) +``` + +## Methods + +### Bounding Box Operations + +#### `to_string()` +```python +box = Box(text="Title", score=0.95, x=10, y=20, width=100, height=50) +box = box.to_string() +# Ensures text is string type +``` + +#### `scale(factor, padding)` +```python +# Scale box by factor and add padding +original = Box(text="Text", score=0.95, x=100, y=100, width=200, height=50) + +# Scale by 2.0 (double size) and add 10px padding +scaled = original.scale(factor=2.0, padding=10) +# Result: x=190, y=190, width=410, height=110 +``` + +#### `shape(padding)` +```python +box = Box(text="Text", score=0.95, x=10, y=20, width=100, height=50) + +# Get corner coordinates +corners = box.shape(padding=0) +# Result: [(10, 20), (110, 70)] + +# With padding +corners_padded = box.shape(padding=5) +# Result: [(5, 15), (115, 75)] +``` + +#### `bbox(padding)` +```python +box = Box(text="Text", score=0.95, x=10, y=20, width=100, height=50) + +# Get bbox coordinates [x1, y1, x2, y2] +bbox = box.bbox(padding=0) +# Result: [10, 20, 110, 70] + +# With padding +bbox_padded = box.bbox(padding=5) +# Result: [5, 15, 115, 75] +``` + +### Geometric Operations + +#### `is_rotated()` +```python +# Check if box is rotated (angle >= 3 degrees) +box = Box(..., angle=45.0) +if box.is_rotated(): + print("Box is rotated") +``` + +#### `iou(box1, box2)` +```python +from scaledp.schemas.Box import Box + +box1 = Box(text="A", score=0.95, x=0, y=0, width=100, height=100) +box2 = Box(text="B", score=0.92, x=50, y=50, width=100, height=100) + +# Calculate Intersection over Union +iou = Box.iou(box1, box2) +# Result: 0.14... (25% overlap) +``` + +#### `merge(box1, box2)` +```python +from scaledp.schemas.Box import Box + +box1 = Box(text="A", score=0.95, x=10, y=10, width=100, height=50) +box2 = Box(text="B", score=0.92, x=80, y=40, width=100, height=50) + +# Merge boxes (returns minimal bounding rectangle) +merged = Box.merge(box1, box2) +# Result: Box(text="A", score=0.95, x=10, y=10, width=170, height=80) +``` + +### Batch Operations + +#### `is_on_same_line(box1, box2, angle_thresh, line_thresh)` +```python +from scaledp.schemas.Box import Box + +box1 = Box(text="Word1", score=0.95, x=0, y=0, width=50, height=20, angle=0) +box2 = Box(text="Word2", score=0.92, x=60, y=2, width=50, height=20, angle=0) + +# Check if boxes are on same text line +same_line = Box.is_on_same_line( + box1, box2, + angle_thresh=10.0, # Max angle difference + line_thresh=0.5 # Max normalized center difference +) +# Result: True (boxes are roughly aligned) +``` + +#### `merge_overlapping_boxes(boxes, iou_threshold, angle_thresh, line_thresh)` +```python +from scaledp.schemas.Box import Box + +boxes = [ + Box(text="A", score=0.95, x=0, y=0, width=100, height=50), + Box(text="B", score=0.92, x=30, y=10, width=100, height=50), # Overlaps with A + Box(text="C", score=0.90, x=200, y=0, width=100, height=50), # No overlap +] + +# Merge overlapping boxes +merged = Box.merge_overlapping_boxes( + boxes, + iou_threshold=0.3, + angle_thresh=10.0, + line_thresh=0.5 +) +# Result: [merged_A_B, C] +``` + +## Related Schemas + +- [Document](./document.md) - Contains list of boxes +- [TextChunks](./text_chunks.md) - Output schema + +## See Also + +- [Document Schema](./document.md) +- [Text Splitters](../models/splitters/index.md) +- [Image Processing](../image_processing.md) diff --git a/docs/source/schemas/document.md b/docs/source/schemas/document.md new file mode 100644 index 0000000..b8f91f2 --- /dev/null +++ b/docs/source/schemas/document.md @@ -0,0 +1,95 @@ +(Document)= +# Document Schema + +## Overview + +`Document` is a structured data schema that represents a document with its text content, metadata, and structural information. It serves as the standard input format for text processing transformers in ScaleDP, including text splitters, embeddings, and NER models. + +The Document schema contains all necessary information for processing a document through a pipeline: the text content, file path, document type, and bounding box information for layout-aware processing. + +## Schema Structure + +```python +from scaledp.schemas.Document import Document +from scaledp.schemas.Box import Box + +# Document dataclass definition +@dataclass +class Document: + path: str # File path or document identifier + text: str # Text content + type: str # Document type (e.g., "text", "pdf", "image") + bboxes: list[Box] # List of bounding boxes + exception: str = "" # Error message (optional) +``` + +## Fields + +| Field | Type | Required | Description | +|-----------|--------------|----------|------------------------------------------------------| +| path | str | Yes | File path or unique document identifier | +| text | str | Yes | Text content of the document | +| type | str | Yes | Document type (e.g., "text", "pdf", "image") | +| bboxes | list[Box] | No | List of bounding boxes for text regions | +| exception | str | No | Error message if document processing failed | + +## Related Schemas + +### Box +The `Box` schema represents a bounding box with position and size information: + +```python +@dataclass +class Box: + text: str # Text content of the box + score: float # Confidence score + x: int # X coordinate + y: int # Y coordinate + width: int # Box width + height: int # Box height + angle: float = 0.0 # Rotation angle (degrees) +``` + +## Usage Examples + +### Creating a Document from Text + +```python +from scaledp.schemas.Document import Document + +# Create a simple text document +doc = Document( + path="test.txt", + text="This is a sample document.", + type="text", + bboxes=[] +) +``` + +## PySpark Schema + +When converting to PySpark SQL schema: + +```python +from scaledp.schemas.Document import Document + +schema = Document.get_schema() +# StructType([ +# StructField('path', StringType(), True), +# StructField('text', StringType(), True), +# StructField('type', StringType(), True), +# StructField('bboxes', ArrayType(BoxType(...)), True), +# StructField('exception', StringType(), True) +# ]) +``` + +## Related Schemas + +- [Box](./box.md) - Bounding box schema +- [TextChunks](./text_chunks.md) - Output schema for text splitters + +## See Also + +- [Text Splitters](../models/splitters/index.md) +- [Embeddings](../models/embeddings.md) +- [NER Models](../models/ner.md) diff --git a/docs/source/schemas/index.md b/docs/source/schemas/index.md new file mode 100644 index 0000000..cebe89c --- /dev/null +++ b/docs/source/schemas/index.md @@ -0,0 +1,290 @@ +(schemas)= +# Data Schemas + +ScaleDP uses structured data schemas to represent documents, chunks, and spatial information throughout the processing pipeline. These schemas provide type safety, consistency, and clarity across all transformers. + +## Core Schemas + +### [Document](./document.md) +Represents a document with text content, file path, and metadata. + +**Key Fields:** +- `path` - File path or document identifier +- `text` - Text content +- `type` - Document type (text, pdf, image) +- `bboxes` - List of bounding boxes +- `exception` - Error message (optional) + +**Usage:** Input format for text splitters, embeddings, and NER models. + +### [TextChunks](./text_chunks.md) +Represents the output of text splitting operations. + +**Key Fields:** +- `path` - Original document path +- `chunks` - List of text chunks +- `exception` - Error message if any +- `processing_time` - Processing duration + +**Usage:** Output from text splitters, input for embeddings. + +### [Box](./box.md) +Represents a bounding box with position and text information. + +**Key Fields:** +- `text` - Text content or label +- `score` - Confidence score +- `x`, `y` - Top-left coordinates +- `width`, `height` - Dimensions +- `angle` - Rotation angle + +**Usage:** Layout information in documents, OCR results. + +## Schema Hierarchy + +``` +Document +├── path: str +├── text: str +├── type: str +├── bboxes: list[Box] +│ ├── text: str +│ ├── score: float +│ ├── x: int +│ ├── y: int +│ ├── width: int +│ ├── height: int +│ └── angle: float +└── exception: str + +TextChunks +├── path: str +├── chunks: list[str] +├── exception: str +└── processing_time: float +``` + +## Processing Pipeline Overview + +``` +Document (Input) + ↓ +Text Splitter + ↓ +TextChunks (Split Results) + ↓ +Embeddings / NER / Other Processing + ↓ +Results +``` + +## Quick Reference + +| Schema | Purpose | Input To | Output From | +|-------------|------------------------------|-------------------|-----------------| +| Document | Represents document data | Text Splitter, NER | PDF Reader | +| TextChunks | Represents text chunks | Embeddings, Search | Text Splitter | +| Box | Represents spatial region | Layout Analysis | OCR, Detector | + +## Common Operations + +### Creating a Document + +```python +from scaledp.schemas.Document import Document + +doc = Document( + path="/path/to/file.txt", + text="Document content here...", + type="text", + bboxes=[] +) +``` + +### Splitting Documents + +```python +from scaledp.models.splitters.TextSplitter import TextSplitter + +splitter = TextSplitter(chunk_size=500, chunk_overlap=50) +result_df = splitter.transform(df) +# Output: DataFrame with TextChunks +``` + +### Processing Chunks + +```python +from pyspark.sql.functions import explode + +# Flatten chunks for further processing +flattened = result_df.select( + "chunks.path", + explode("chunks.chunks").alias("chunk") +) +``` + +### Analyzing Box Operations + +```python +from scaledp.schemas.Box import Box + +# Calculate IoU between boxes +iou = Box.iou(box1, box2) + +# Merge overlapping boxes +merged = Box.merge_overlapping_boxes(boxes, iou_threshold=0.3) + +# Check if rotated +if box.is_rotated(): + handle_rotated_box(box) +``` + +## PySpark Schema Access + +All schemas provide `.get_schema()` method for PySpark integration: + +```python +from scaledp.schemas.Document import Document +from scaledp.schemas.TextChunks import TextChunks +from scaledp.schemas.Box import Box + +# Get PySpark SQL schema +doc_schema = Document.get_schema() +chunks_schema = TextChunks.get_schema() +box_schema = Box.get_schema() + +# Use in DataFrames +df = spark.createDataFrame(data, schema=doc_schema) +``` + +## Error Handling + +All schemas support error tracking: + +```python +from scaledp.schemas.Document import Document +from scaledp.schemas.TextChunks import TextChunks + +# Document with error +doc = Document( + path="file.txt", + text="", + type="text", + bboxes=[], + exception="Failed to read file" +) + +# TextChunks with error +result = TextChunks( + path="file.txt", + chunks=None, + exception="Splitting failed: Invalid input", + processing_time=0.0 +) + +# Check for errors in pipeline +failed = result_df.filter("chunks.exception != ''") +``` + +## Best Practices + +### 1. Always Provide Path Information +```python +# Good: includes path for tracking +doc = Document(path="document.pdf", ...) + +# Avoid: missing path +doc = Document(path="", ...) +``` + +### 2. Set Document Types Correctly +```python +# Good: specific types +Document(..., type="pdf", ...) +Document(..., type="image", ...) +Document(..., type="text", ...) + +# Avoid: generic types +Document(..., type="document", ...) +``` + +### 3. Include Bounding Boxes When Available +```python +# Good: includes layout info +doc = Document( + path="page.jpg", + text=text, + type="image", + bboxes=detected_boxes +) + +# Acceptable: empty for plain text +doc = Document( + path="text.txt", + text=text, + type="text", + bboxes=[] +) +``` + +### 4. Track Processing Errors +```python +# Good: capture and report errors +try: + result = process(doc) +except Exception as e: + result = TextChunks( + path=doc.path, + chunks=None, + exception=str(e), + processing_time=0.0 + ) +``` + +## Integration Examples + +### Text Processing Pipeline +```python +from pyspark.ml import Pipeline +from scaledp.models.splitters.TextSplitter import TextSplitter +from scaledp.models.embeddings.HFEmbeddings import HFEmbeddings + +# Create pipeline with schemas +stages = [ + TextSplitter(inputCol="document", outputCol="chunks"), + HFEmbeddings(inputCol="chunks.chunks", outputCol="embeddings"), +] + +pipeline = Pipeline(stages=stages) +result = pipeline.fit(df).transform(df) +``` + +### Batch Processing +```python +from scaledp.schemas.Document import Document + +# Create documents from files +documents = [] +for file_path in files: + doc = Document( + path=file_path, + text=read_file(file_path), + type="text", + bboxes=[] + ) + documents.append(doc) + +# Process as DataFrame +df = spark.createDataFrame( + [(doc,) for doc in documents], + ["document"] +) +``` + +## See Also + +- [Document Schema](./document.md) - Detailed documentation +- [TextChunks Schema](./text_chunks.md) - Detailed documentation +- [Box Schema](./box.md) - Detailed documentation +- [Text Splitters](../models/splitters/index.md) +- [Embeddings](../models/embeddings.md) diff --git a/docs/source/schemas/text_chunks.md b/docs/source/schemas/text_chunks.md new file mode 100644 index 0000000..e62f556 --- /dev/null +++ b/docs/source/schemas/text_chunks.md @@ -0,0 +1,122 @@ +(TextChunks)= +# TextChunks Schema + +## Overview + +`TextChunks` is a structured data schema that represents the output of text splitting operations. It contains the split text chunks along with metadata about the original document and processing information. This schema is produced by text splitter transformers and serves as input for downstream processing tasks like embedding generation. + +The TextChunks schema maintains traceability by preserving the document path and captures processing details for debugging and monitoring. + +## Schema Structure + +```python +from scaledp.schemas.TextChunks import TextChunks +from typing import Optional + +# TextChunks dataclass definition +@dataclass(order=True) +class TextChunks: + path: Optional[str] # Original document path + chunks: Optional[list[str]] # List of text chunks + exception: Optional[str] = "" # Error message if any + processing_time: Optional[float] = 0.0 # Processing duration in seconds +``` + +## Fields + +| Field | Type | Required | Description | +|------------------|-------------------|----------|------------------------------------------------| +| path | Optional[str] | No | Original document file path | +| chunks | Optional[list[str]]| No | List of text chunks from splitting | +| exception | Optional[str] | No | Error message if splitting failed | +| processing_time | Optional[float] | No | Time taken to split document (in seconds) | + +## Usage Examples + +### Basic TextChunks Creation + +```python +from scaledp.schemas.TextChunks import TextChunks + +# Successful split result +result = TextChunks( + path="/documents/file.txt", + chunks=[ + "First chunk of text...", + "Second chunk of text...", + "Third chunk of text..." + ], + exception="", + processing_time=0.125 +) +``` + + +## PySpark Schema + +```python +from scaledp.schemas.TextChunks import TextChunks + +schema = TextChunks.get_schema() +# StructType([ +# StructField('path', StringType(), True), +# StructField('chunks', ArrayType(StringType()), True), +# StructField('exception', StringType(), True), +# StructField('processing_time', DoubleType(), True) +# ]) +``` + +## Processing Pipeline Integration + +### With Text Splitter + +```python +from scaledp.models.splitters.TextSplitter import TextSplitter + +# TextSplitter produces TextChunks output +text_splitter = TextSplitter( + inputCol="document", + outputCol="chunks", # Output is TextChunks schema + chunk_size=500, +) + +result_df = text_splitter.transform(input_df) +# result_df has column "chunks" of type TextChunks +``` + +### Flattening Chunks + +```python +from pyspark.sql.functions import explode + +# Flatten chunks for further processing +flattened = result_df.select( + "chunks.path", + explode("chunks.chunks").alias("chunk"), + "chunks.processing_time" +) +``` + +### Chunk Distribution + +```python +from pyspark.sql.functions import size + +# Analyze chunk distribution +result_df.select( + "chunks.path", + size("chunks.chunks").alias("num_chunks"), +).groupBy().agg({"num_chunks": "avg"}).show() +``` + +## Related Schemas + +- [Document](./document.md) - Input schema for text splitting +- [Box](./box.md) - Bounding box schema + +## See Also + +- [Text Splitters](../models/splitters/index.md) +- [BaseTextSplitter](../models/splitters/base_text_splitter.md) +- [TextSplitter](../models/splitters/text_splitter.md) +- [Embeddings](../models/embeddings.md) diff --git a/docs/source/splitters.md b/docs/source/splitters.md new file mode 100644 index 0000000..1271f1f --- /dev/null +++ b/docs/source/splitters.md @@ -0,0 +1,51 @@ +Text Splitters +============== + +## Overview + +This section provides comprehensive documentation on text splitters in ScaleDP. Text splitters divide documents into smaller, manageable chunks while preserving metadata and handling errors gracefully. They are essential for processing large documents in downstream tasks like embedding generation or information extraction. + +ScaleDP provides a flexible text splitting framework built on PySpark ML pipelines with support for semantic splitting, distributed processing, and metadata preservation. + +## Text Splitters + +* [**TextSplitter**](./models/splitters/text_splitter.md) - Semantic text splitter using intelligent chunking + +## Base Classes + +* [**BaseTextSplitter**](./models/splitters/base_text_splitter.md) - Abstract base for text splitters +* [**BaseSplitter**](./models/splitters/base_splitter.md) - Foundation class for all splitter implementations + +## Quick Start + +```python +from scaledp.models.splitters.TextSplitter import TextSplitter + +# Create a text splitter +splitter = TextSplitter( + inputCol="document", + outputCol="chunks", + chunk_size=500, + chunk_overlap=50, +) + +# Use in a Spark pipeline +result_df = splitter.transform(input_df) +``` + +## Related Schemas + +* [**Data Schemas**](./schemas.md) - Complete schema documentation + * [**Document**](./schemas/document.md) - Input document schema + * [**TextChunks**](./schemas/text_chunks.md) - Output chunks schema + * [**Box**](./schemas/box.md) - Bounding box schema + +## Features + +- **Semantic Splitting** - Intelligently split text at natural boundaries +- **Distributed Processing** - Scale to large datasets with Spark +- **Metadata Preservation** - Keep document information through splitting +- **Error Handling** - Graceful error reporting and recovery +- **Pipeline Integration** - Seamless integration with PySpark ML pipelines + +For detailed information and examples, see the [Text Splitters Documentation](./models/splitters/index.md). diff --git a/poetry.lock b/poetry.lock index 555769b..bc0e58a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -5294,6 +5294,30 @@ dev = ["cython-lint (>=0.12.2)", "doit (>=0.36.0)", "mypy (==1.10.0)", "pycodest doc = ["intersphinx_registry", "jupyterlite-pyodide-kernel", "jupyterlite-sphinx (>=0.16.5)", "jupytext", "matplotlib (>=3.5)", "myst-nb", "numpydoc", "pooch", "pydata-sphinx-theme (>=0.15.2)", "sphinx (>=5.0.0,<8.0.0)", "sphinx-copybutton", "sphinx-design (>=0.4.0)"] test = ["Cython", "array-api-strict (>=2.0,<2.1.1)", "asv", "gmpy2", "hypothesis (>=6.30)", "meson", "mpmath", "ninja ; sys_platform != \"emscripten\"", "pooch", "pytest", "pytest-cov", "pytest-timeout", "pytest-xdist", "scikit-umfpack", "threadpoolctl"] +[[package]] +name = "semantic-text-splitter" +version = "0.28.0" +description = "" +optional = false +python-versions = ">=3.10" +groups = ["main"] +files = [ + {file = "semantic_text_splitter-0.28.0-cp310-abi3-macosx_10_12_x86_64.whl", hash = "sha256:625ed49fae4be45ef624da1b57326356c33d26dc275b4115c20afe8d1e7abab1"}, + {file = "semantic_text_splitter-0.28.0-cp310-abi3-macosx_11_0_arm64.whl", hash = "sha256:d185880c21ae2028261efe0271d7d6e2a90bf933179e850857b99648dc637496"}, + {file = "semantic_text_splitter-0.28.0-cp310-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:f92c11610c7b2999033f8b5bc139133008d9a2b6a71daea312c7d22ce3199fe0"}, + {file = "semantic_text_splitter-0.28.0-cp310-abi3-manylinux_2_28_armv7l.whl", hash = "sha256:f44b77e5f06059b5d30035ce29880da40d61a0519b6b3292a88ac59135c54079"}, + {file = "semantic_text_splitter-0.28.0-cp310-abi3-manylinux_2_28_ppc64le.whl", hash = "sha256:de4742f5d04f92fbca553aefae5febb772793c939044ea40028b00ca8e4c47b9"}, + {file = "semantic_text_splitter-0.28.0-cp310-abi3-manylinux_2_28_s390x.whl", hash = "sha256:76db639fda2c8d56347e38b9ee2d355887406f6bc410d02f646dad27557760ef"}, + {file = "semantic_text_splitter-0.28.0-cp310-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:dbe46461c9ea58fcad04fffdab20af9653929134ef700113522865fbbd0b8af9"}, + {file = "semantic_text_splitter-0.28.0-cp310-abi3-win32.whl", hash = "sha256:f26b5cfda7fcff81a829ce9bb10858838872602a7bffe2e7ca1e97d5d18143b0"}, + {file = "semantic_text_splitter-0.28.0-cp310-abi3-win_amd64.whl", hash = "sha256:a793d98114c088ca11eeefbfe392946adfe4d1eb421b12ce213cb818348d3614"}, + {file = "semantic_text_splitter-0.28.0.tar.gz", hash = "sha256:2cdd968fe1411abedc23aa283858def9178288b1e8ab2357bb91cccf2e8cf920"}, +] + +[package.extras] +docs = ["pdoc"] +test = ["pytest", "tokenizers", "tree-sitter-python"] + [[package]] name = "send2trash" version = "1.8.3" @@ -6459,4 +6483,4 @@ paddle = [] [metadata] lock-version = "2.1" python-versions = "^3.10" -content-hash = "e3513c9d3fa60d18fc08b4c95ad59c7f682c02f30bfa1bf72df23b2214cb7536" +content-hash = "ee4297467bca0a395d309d277d45b3a86294a26a7846654903ba3f68219464f0" diff --git a/pyproject.toml b/pyproject.toml index c00b631..d2d63de 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,6 +47,7 @@ pyclipper = "^1.3.0.post6" onnxruntime = "1.22.0" opencv-python = "^4.12.0.88" sentence-transformers = {version ="^5.1.2", optional = true} +semantic-text-splitter = "^0.28.0" [tool.poetry.extras] diff --git a/scaledp/models/splitters/BaseSplitter.py b/scaledp/models/splitters/BaseSplitter.py new file mode 100644 index 0000000..526e51f --- /dev/null +++ b/scaledp/models/splitters/BaseSplitter.py @@ -0,0 +1,31 @@ +from abc import ABC + +from pyspark.ml import Transformer + +from scaledp.params import ( + HasChunkOverlap, + HasChunkSize, + HasInputCol, + HasKeepInputData, + HasNumPartitions, + HasOutputCol, + HasPartitionMap, + HasWhiteList, +) + + +class BaseSplitter( + Transformer, + HasInputCol, + HasOutputCol, + HasKeepInputData, + HasWhiteList, + HasNumPartitions, + HasPartitionMap, + HasChunkSize, + HasChunkOverlap, + ABC, +): + """ + Abstract base class for text splitters. + """ diff --git a/scaledp/models/splitters/BaseTextSplitter.py b/scaledp/models/splitters/BaseTextSplitter.py new file mode 100644 index 0000000..dc61650 --- /dev/null +++ b/scaledp/models/splitters/BaseTextSplitter.py @@ -0,0 +1,177 @@ +import json +from abc import abstractmethod +from types import MappingProxyType +from typing import Any + +import pandas as pd +from pyspark import keyword_only +from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable +from pyspark.sql.functions import lit, pandas_udf, udf + +from scaledp.params import HasColumnValidator, HasDefaultEnum +from scaledp.schemas.Document import Document +from scaledp.schemas.TextChunks import TextChunks + +from .BaseSplitter import BaseSplitter + + +class BaseTextSplitter( + BaseSplitter, + DefaultParamsReadable, + DefaultParamsWritable, + HasColumnValidator, + HasDefaultEnum, +): + """ + Abstract base class for text splitters that split text into chunks. + Provides common functionality for text splitting operations. + """ + + defaultParams = MappingProxyType( + { + "inputCol": "document", + "outputCol": "chunks", + "keepInputData": True, + "chunk_size": 500, + "chunk_overlap": 0, + "numPartitions": 1, + "partitionMap": False, + }, + ) + + @keyword_only + def __init__(self, **kwargs: Any): + super(BaseTextSplitter, self).__init__() + self._setDefault(**self.defaultParams) + self._set(**kwargs) + + def get_params(self): + """Get transformer parameters as JSON string.""" + return json.dumps({k.name: v for k, v in self.extractParamMap().items()}) + + @abstractmethod + def split(self, document: Document) -> TextChunks: + """ + Split a document into chunks. + + Args: + document: The document to split + + Returns: + TextChunks object containing the chunks and metadata + """ + + def transform_udf(self, document_struct): + """ + Transform UDF that splits text into chunks. + + Args: + document_struct: A Document struct containing path, text, type, and bboxes + + Returns: + TextChunks object containing the chunks + """ + # document_struct is already a Document object + result = self.split(document_struct) + return result + + @classmethod + def transform_udf_pandas( + cls, + documents: pd.Series, + params: pd.Series, + ) -> pd.DataFrame: + """ + Transform pandas Series of Documents using the splitter. + + Args: + documents: Series containing Document objects (as Row-like objects from Arrow) + params: Series containing splitter parameters + + Returns: + DataFrame with TextChunks results + """ + params_dict = json.loads(params.iloc[0]) + splitter = cls(**params_dict) + results = [] + for doc_row in documents: + # Convert Row to Document + # When using pandas_udf with Arrow, the struct comes as + # a Row object with field attributes + try: + if isinstance(doc_row, Document): + doc = doc_row + else: + # Try to get attributes from Row object + doc = Document( + path=doc_row.path, + text=doc_row.text, + type=doc_row.type, + bboxes=doc_row.bboxes, + exception=getattr(doc_row, "exception", ""), + ) + output = splitter.split(doc) + except (AttributeError, TypeError, Exception) as e: + # If something goes wrong, create an error result + output = TextChunks( + path="", + chunks=[], + exception=str(e), + processing_time=0.0, + ) + + # Convert to dict to ensure proper schema + results.append( + { + "path": output.path, + "chunks": output.chunks, + "exception": output.exception, + "processing_time": output.processing_time, + }, + ) + return pd.DataFrame(results) + + def _transform(self, dataset): + """ + Transform a DataFrame by splitting text into chunks. + + Args: + dataset: Input DataFrame with Document struct column + + Returns: + DataFrame with chunks column added + """ + params = self.get_params() + out_col = self.getOutputCol() + input_col = self.getInputCol() + + # Validate input column exists + if input_col not in dataset.columns: + raise ValueError(f"Column {input_col} not found in dataset") + + # Validate input column + validated_input_col = self._validate(input_col, dataset) + + if not self.getPartitionMap(): + # Regular mode: use UDF + result = dataset.withColumn( + out_col, + udf(self.transform_udf, TextChunks.get_schema())(validated_input_col), + ) + else: + # Pandas mode: use pandas_udf + if self.getNumPartitions() > 0: + dataset = dataset.coalesce(self.getNumPartitions()) + + result = dataset.withColumn( + out_col, + pandas_udf(self.transform_udf_pandas, TextChunks.get_schema())( + validated_input_col, + lit(params), + ), + ) + + if not self.getKeepInputData(): + result = result.drop(validated_input_col) + + return result diff --git a/scaledp/models/splitters/TextSplitter.py b/scaledp/models/splitters/TextSplitter.py new file mode 100644 index 0000000..46875f8 --- /dev/null +++ b/scaledp/models/splitters/TextSplitter.py @@ -0,0 +1,49 @@ +import time +from types import MappingProxyType +from typing import Any + +from pyspark import keyword_only +from semantic_text_splitter import TextSplitter as SemanticTextSplitter + +from scaledp.schemas.Document import Document +from scaledp.schemas.TextChunks import TextChunks + +from .BaseTextSplitter import BaseTextSplitter + + +class TextSplitter(BaseTextSplitter): + """ + Text splitter implementation using semantic_text_splitter library. + """ + + defaultParams = MappingProxyType({**BaseTextSplitter.defaultParams}) + + @keyword_only + def __init__(self, **kwargs: Any): + super(TextSplitter, self).__init__() + self._setDefault(**self.defaultParams) + self._set(**kwargs) + # Note: splitter is created lazily in split() method because + # chunk_size and chunk_overlap may be changed after initialization + + def _get_splitter(self): + """Get or create the semantic text splitter with current parameters.""" + chunk_size = self.getOrDefault("chunk_size") + return SemanticTextSplitter(chunk_size) + + def split(self, document: Document) -> TextChunks: + start_time = time.time() + try: + splitter = self._get_splitter() + chunks = splitter.chunks(document.text) + exception = "" + except Exception as e: + chunks = [] + exception = str(e) + processing_time = time.time() - start_time + return TextChunks( + path=document.path, + chunks=chunks, + exception=exception, + processing_time=processing_time, + ) diff --git a/scaledp/models/splitters/__init__.py b/scaledp/models/splitters/__init__.py new file mode 100644 index 0000000..0cd98e9 --- /dev/null +++ b/scaledp/models/splitters/__init__.py @@ -0,0 +1,5 @@ +from .BaseSplitter import BaseSplitter +from .BaseTextSplitter import BaseTextSplitter +from .TextSplitter import TextSplitter + +__all__ = ["BaseSplitter", "BaseTextSplitter", "TextSplitter"] diff --git a/scaledp/params.py b/scaledp/params.py index 5a6c4ce..632d2b2 100644 --- a/scaledp/params.py +++ b/scaledp/params.py @@ -864,3 +864,53 @@ def setLabels(self, value: list[str]) -> Any: Sets the value of :py:attr:`labels`. """ return self._set(labels=value) + + +class HasChunkSize(Params): + """ + Mixin for param chunk_size: size of text chunks. + """ + + chunk_size = Param( + Params._dummy(), + "chunk_size", + "Size of text chunks for splitting.", + typeConverter=TypeConverters.toInt, + ) + + def getChunkSize(self) -> int: + """ + Gets the value of chunk_size or its default value. + """ + return self.getOrDefault(self.chunk_size) + + def setChunkSize(self, value: int) -> Any: + """ + Sets the value of :py:attr:`chunk_size`. + """ + return self._set(chunk_size=value) + + +class HasChunkOverlap(Params): + """ + Mixin for param chunk_overlap: overlap between text chunks. + """ + + chunk_overlap = Param( + Params._dummy(), + "chunk_overlap", + "Overlap between text chunks for splitting.", + typeConverter=TypeConverters.toInt, + ) + + def getChunkOverlap(self) -> int: + """ + Gets the value of chunk_overlap or its default value. + """ + return self.getOrDefault(self.chunk_overlap) + + def setChunkOverlap(self, value: int) -> Any: + """ + Sets the value of :py:attr:`chunk_overlap`. + """ + return self._set(chunk_overlap=value) diff --git a/scaledp/schemas/TextChunks.py b/scaledp/schemas/TextChunks.py new file mode 100644 index 0000000..b424c5d --- /dev/null +++ b/scaledp/schemas/TextChunks.py @@ -0,0 +1,19 @@ +from dataclasses import dataclass +from typing import Optional + +from scaledp.utils.dataclass import map_dataclass_to_struct, register_type + + +@dataclass(order=True) +class TextChunks: + path: Optional[str] + chunks: Optional[list[str]] + exception: Optional[str] = "" + processing_time: Optional[float] = 0.0 + + @staticmethod + def get_schema(): + return map_dataclass_to_struct(TextChunks) + + +register_type(TextChunks, TextChunks.get_schema) diff --git a/tests/conftest.py b/tests/conftest.py index 4f69427..be73c26 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -234,3 +234,25 @@ def text_df(spark_session, resource_path_root): (resource_path_root / "texts/example.txt").absolute().as_posix(), wholetext=True, ) + + +@pytest.fixture +def text_splitter_df(spark_session, resource_path_root): + """Fixture for text splitter tests with Document struct column.""" + from pyspark.sql.functions import col, lit, struct + from pyspark.sql.types import ArrayType, StructType + + text_path = (resource_path_root / "texts/example.txt").absolute().as_posix() + df = spark_session.read.text(text_path, wholetext=True) + + # Create Document struct from text and path + return df.withColumn( + "document", + struct( + lit(text_path).alias("path"), + col("value").alias("text"), + lit("text").alias("type"), + lit([]).cast(ArrayType(StructType([]))).alias("bboxes"), + lit("").alias("exception"), + ), + ).select("document") diff --git a/tests/models/splitters/__init__.py b/tests/models/splitters/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/models/splitters/test_text_splitter.py b/tests/models/splitters/test_text_splitter.py new file mode 100644 index 0000000..6b118ad --- /dev/null +++ b/tests/models/splitters/test_text_splitter.py @@ -0,0 +1,148 @@ +import pytest +from pyspark.ml import PipelineModel + +from scaledp.models.splitters.TextSplitter import TextSplitter +from scaledp.schemas.Document import Document + + +def test_text_splitter_initialization(): + """Test that TextSplitter can be initialized with default parameters.""" + text_splitter = TextSplitter() + + assert text_splitter.getOrDefault("chunk_size") == 500 + assert text_splitter.getOrDefault("inputCol") == "document" + assert text_splitter.getOrDefault("outputCol") == "chunks" + + +def test_text_splitter_custom_parameters(): + """Test that TextSplitter can be initialized with custom parameters.""" + text_splitter = TextSplitter( + chunk_size=1000, + chunk_overlap=100, + inputCol="my_document", + outputCol="text_chunks", + ) + + assert text_splitter.getOrDefault("chunk_size") == 1000 + assert text_splitter.getOrDefault("chunk_overlap") == 100 + assert text_splitter.getOrDefault("inputCol") == "my_document" + assert text_splitter.getOrDefault("outputCol") == "text_chunks" + + +def test_text_splitter_split_method(): + """Test the split method directly on a Document.""" + text_splitter = TextSplitter(chunk_size=100, chunk_overlap=10) + + # Create a test document + long_text = " ".join(["This is a test sentence."] * 20) + document = Document(path="test.txt", text=long_text, type="text", bboxes=[]) + + # Split the document + result = text_splitter.split(document) + + # Verify the result + assert result.path == "test.txt" + assert result.exception == "" + assert len(result.chunks) > 1 + assert result.processing_time > 0 + + # Verify all chunks are strings + assert all(isinstance(chunk, str) for chunk in result.chunks) + + # Verify chunks are not empty + assert all(len(chunk) > 0 for chunk in result.chunks) + + +def test_text_splitter_split_method_with_exception(): + """Test the split method handles exceptions properly.""" + text_splitter = TextSplitter() + + # Create a document with None text (should cause an exception) + document = Document(path="test.txt", text=None, type="text", bboxes=[]) + + # Split the document + result = text_splitter.split(document) + + # Verify the result contains exception information + assert result.path == "test.txt" + assert result.exception != "" + assert len(result.chunks) == 0 + assert result.processing_time > 0 + + +def test_text_splitter_pipeline(text_splitter_df): + """Test TextSplitter in a PySpark pipeline.""" + # Initialize the TextSplitter stage + text_splitter = TextSplitter( + inputCol="document", + outputCol="chunks", + chunk_size=200, + ) + + # Create a pipeline with the TextSplitter stage + pipeline = PipelineModel(stages=[text_splitter]) + + result_df = pipeline.transform(text_splitter_df) + + # Cache the result for performance + result = result_df.select("chunks").cache() + + # Collect the results + data = result.collect() + + # Check that exceptions are empty + assert all(row.chunks.exception == "" for row in data) + + # Assert that there is at least one result + assert len(data) > 0 + + # Assert that the 'chunks' field is present in the result + assert hasattr(data[0], "chunks") + + # Verify the chunks are not empty + for row in data: + assert row.chunks.chunks is not None + assert len(row.chunks.chunks) > 0 + assert row.chunks.path is not None + assert row.chunks.processing_time > 0 + + +@pytest.mark.skip( + reason="pandas_udf schema handling needs refinement for Document structs", +) +def test_text_splitter_pipeline_pandas(text_splitter_df): + """Test TextSplitter with partitionMap (pandas mode).""" + # Initialize the TextSplitter stage + text_splitter = TextSplitter( + inputCol="document", + outputCol="chunks", + chunk_size=200, + partitionMap=True, + ) + + # Create a pipeline with the TextSplitter stage + pipeline = PipelineModel(stages=[text_splitter]) + + result_df = pipeline.transform(text_splitter_df) + + # Cache the result for performance + result = result_df.select("chunks").cache() + + # Collect the results + data = result.collect() + + # Check that exceptions are empty + assert all(row.chunks.exception == "" for row in data) + + # Assert that there is at least one result + assert len(data) > 0 + + # Assert that the 'chunks' field is present in the result + assert hasattr(data[0], "chunks") + + # Verify the chunks are not empty + for row in data: + assert row.chunks.chunks is not None + assert len(row.chunks.chunks) > 0 + assert row.chunks.path is not None + assert row.chunks.processing_time > 0