-
Notifications
You must be signed in to change notification settings - Fork 0
Added a datatrove based pipeline for filtering tokenized data using scores. #235
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
BlueCrescent
commented
Jul 25, 2025
- Included an example configuration file.
- Added datatrove and pydantic-settings to requirements.
- Note that modalities is also required for the pipeline to work, but it is not included in the requirements file.
…ized data using scores. - Included an example configuration file. - Added datatrove and pydantic-settings to requirements. - Note that modalities is also required for the pipeline to work, but it is not included in the requirements file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements a data filtering pipeline using datatrove for filtering tokenized data based on scores. The pipeline processes JSONL files containing scores for data samples and filters corresponding tokenized datasets based on configurable thresholds.
- Adds a complete datatrove-based filtering pipeline with score parsing and data filtering components
- Introduces configuration management using pydantic-settings for both local and Slurm execution environments
- Updates dependencies to include datatrove and pydantic-settings
Reviewed Changes
Copilot reviewed 5 out of 6 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py | Implements ScoresParser class for reading JSONL score files and mapping to tokenized data |
| src/ml_filter/data_processing/score_based_filtering/step_data_filtering.py | Implements DataFiltering class for filtering datasets based on score thresholds |
| src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py | Main pipeline orchestration with configuration management and execution settings |
| pyproject.toml | Adds datatrove and pydantic-settings dependencies |
| configs/data_processing/example_filter_pipeline_config.yaml | Example configuration file for the filtering pipeline |
Comments suppressed due to low confidence (1)
src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py:241
- [nitpick] The error message could be more helpful by providing an example of how to use the FilterPipelineBuilder class directly or where to find documentation.
"and use the FilterPipelineBuilder class directly."
src/ml_filter/data_processing/score_based_filtering/step_data_filtering.py
Outdated
Show resolved
Hide resolved
src/ml_filter/data_processing/score_based_filtering/step_data_filtering.py
Outdated
Show resolved
Hide resolved
src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py
Outdated
Show resolved
Hide resolved
src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py
Show resolved
Hide resolved
src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py
Outdated
Show resolved
Hide resolved
src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py
Outdated
Show resolved
Hide resolved
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…g pipeline and adapted the codebase for new changes from main
… execution settings
…dle duplicates in score parsing
| document = self.get_document_from_dict(doc_content, filepath, 0) | ||
| return [document] | ||
|
|
||
| def _parse_scores_jsonl_file(self, filepath: str) -> tuple[str, list[dict[str, float]]]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the scores are emitted in lexicographic order of the document IDs. IDs such as sample1, sample2, sample10 will be reordered to sample1, sample10, sample2, so the thresholds get applied to the wrong rows in the packed dataset. Please preserve the original file order (e.g. rely on insertion order or track the original line index when deduplicating).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in a0698c2
src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py
Show resolved
Hide resolved
src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py
Show resolved
Hide resolved
| output_folder (Path): The folder where the filtered datasets will be saved. | ||
| thresholds (dict[str, float]): A dictionary where keys are score names and values are the | ||
| thresholds to filter samples. | ||
| hash_to_base_file_mapping_csv (Path): A CSV file mapping base file hashes to their corresponding paths. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like an artifact
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed in f2e8f24
src/ml_filter/data_processing/score_based_filtering/step_data_filtering.py
Show resolved
Hide resolved
|
|
||
| sbatch_args = values.get("sbatch_args") or {} | ||
| if isinstance(sbatch_args, _DictConfig): | ||
| sbatch_args = OmegaConf.to_container(sbatch_args, resolve=True) # type: ignore[arg-type] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this not throw an error ?, unless you import OmegaConf ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OmegaConf is imported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm ? from omegaconf import DictConfig as _DictConfig does not import OmegaConf. I am not sure why the code is not throwing an error here, from omegaconf import DictConfig as _DictConfig, OmegaConf should be the way
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in e791792
src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py
Outdated
Show resolved
Hide resolved
| @@ -0,0 +1,173 @@ | |||
| import json | |||
| import logging | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused import, please remove it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed in 85e3f5c
| """ | ||
| Maps a base file path to the corresponding tokenized data path. | ||
| Args: | ||
| base_file_path (str): The path of the base file. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update the docstrings to reflect the new changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 1c3656c
| _TOKENIZER_CACHE: dict[str, Any] = {} | ||
|
|
||
| HEADER_SIZE = 64 # Mimics EmbeddedStreamData.HEADER_SIZE_IN_BYTES (simplified for tests) | ||
| DATA_SECTION_LEN_BYTES = 8 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unsed constants DATA_SECTION_LEN_BYTES and TOKEN_SIZE_DESC_LEN_BYTES
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed in 1c3656c
| from modalities.dataloader.filter_packed_data import filter_dataset | ||
| except ImportError: | ||
| logging.error("The filtering pipeline requires the 'modalities' package to be installed.") | ||
| exit(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using exit(1) is not ideal , i would say something like
try:
from modalities.dataloader.filter_packed_data import filter_dataset
except ImportError as exc:
raise ImportError(
"The filtering pipeline requires the optional dependency 'modalities'. "
"Install it via `pip install modalities` and try again."
) from exc
would be better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 379df23
| """ | ||
|
|
||
| name = "ScoresParser" | ||
| # type = "Parser" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove this line altogether
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 379df23
src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py
Show resolved
Hide resolved
AbasKhan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apart from a minor change , rest looks really good . Well done
|
|
||
| sbatch_args = values.get("sbatch_args") or {} | ||
| if isinstance(sbatch_args, _DictConfig): | ||
| sbatch_args = OmegaConf.to_container(sbatch_args, resolve=True) # type: ignore[arg-type] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm ? from omegaconf import DictConfig as _DictConfig does not import OmegaConf. I am not sure why the code is not throwing an error here, from omegaconf import DictConfig as _DictConfig, OmegaConf should be the way
| ] | ||
| return pipeline | ||
|
|
||
| if __name__ == "__main__": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this here ?, I think we should have a entry point in main.py rather
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added in e19f4a0
AbasKhan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can merge it. But, I would suggest to add Mehdi or Max and second reviewer