diff --git a/llmfoundry/command_utils/__init__.py b/llmfoundry/command_utils/__init__.py index 756e611..39cbd13 100644 --- a/llmfoundry/command_utils/__init__.py +++ b/llmfoundry/command_utils/__init__.py @@ -3,6 +3,10 @@ from llmfoundry.command_utils.data_prep.convert_dataset_hf import ( convert_dataset_hf, convert_dataset_hf_from_args, + DataSplitConstants, + DatasetConstants, + add_dataset_config, + CONSTS, ) from llmfoundry.command_utils.data_prep.convert_dataset_json import ( convert_dataset_json, @@ -45,6 +49,10 @@ 'eval_from_yaml', 'convert_dataset_hf', 'convert_dataset_hf_from_args', + 'add_dataset_config', + 'DataSplitConstants', + 'DatasetConstants', + 'CONSTS', 'convert_dataset_json', 'convert_dataset_json_from_args', 'convert_delta_to_contrastive_mds', diff --git a/llmfoundry/command_utils/data_prep/convert_dataset_hf.py b/llmfoundry/command_utils/data_prep/convert_dataset_hf.py index 69b5e68..9e836a9 100644 --- a/llmfoundry/command_utils/data_prep/convert_dataset_hf.py +++ b/llmfoundry/command_utils/data_prep/convert_dataset_hf.py @@ -160,6 +160,9 @@ def __init__( CONSTS = {'allenai/c4': c4constants, 'the_pile': pileconstants} +def add_dataset_config(name, splits): + global CONSTS + CONSTS[name] = splits def build_hf_dataset( dataset_name: str, @@ -348,6 +351,8 @@ def convert_dataset_hf( else: mode = ConcatMode.NO_CONCAT built_tokenizer = None + if tokenizer: + built_tokenizer = build_tokenizer(tokenizer, tokenizer_kwargs) columns = {'text': 'str'} for split_name in splits: @@ -377,7 +382,7 @@ def convert_dataset_hf( ) loader = build_dataloader( dataset=hf_dataset, - batch_size=512, + batch_size=1, num_workers=num_workers, ) samples = generate_samples( diff --git a/llmfoundry/data/data.py b/llmfoundry/data/data.py index 17b28e1..e792a66 100644 --- a/llmfoundry/data/data.py +++ b/llmfoundry/data/data.py @@ -161,13 +161,14 @@ def __iter__(self) -> Iterable[dict[str, NDArray]]: ) iids = encoded['input_ids'] buffer = buffer + self.bos_tokens + iids + self.eos_tokens - while len(buffer) >= self.max_length: + while len(buffer) >= self.max_length or len(buffer) > 0: concat_sample = buffer[:self.max_length] buffer = buffer[self.max_length:] if self.should_wrap else [] yield { # convert to ndarray to store in MDS format 'tokens': np.asarray(concat_sample, dtype=np.int32), } + break def stream_remote_local_validate( diff --git a/scripts/data_prep/convert_dataset_hf.py b/scripts/data_prep/convert_dataset_hf.py index 3b89386..16dd91d 100644 --- a/scripts/data_prep/convert_dataset_hf.py +++ b/scripts/data_prep/convert_dataset_hf.py @@ -4,7 +4,7 @@ """Streaming dataset conversion scripts for C4 and The Pile.""" from argparse import ArgumentParser, Namespace -from llmfoundry.command_utils import convert_dataset_hf_from_args +from llmfoundry.command_utils import convert_dataset_hf_from_args, DatasetConstants, DataSplitConstants, add_dataset_config, CONSTS def parse_args() -> Namespace: diff --git a/scripts/data_prep/dataset_constants_split_config.py b/scripts/data_prep/dataset_constants_split_config.py new file mode 100644 index 0000000..40262d0 --- /dev/null +++ b/scripts/data_prep/dataset_constants_split_config.py @@ -0,0 +1,36 @@ +from llmfoundry.command_utils import DatasetConstants, DataSplitConstants, add_dataset_config + +def generate_constants(chars_per_sample, chars_per_token, label=None, splits=("full", 1, 10, 100, 1000)): + ds_const = DatasetConstants( + chars_per_sample=chars_per_sample, # Computed over validation set + chars_per_token=chars_per_token, # OpenAI estimate + ) + total_rows = None + # we generate only train and test use --data_subset --out_root + ds_const.splits[f"train"] = DataSplitConstants( + hf_split="train", + folder_split=f"train", + raw_samples=total_rows, + truncated_samples=total_rows, + ) + + ds_const.splits[f"test"] = DataSplitConstants( + hf_split="test", + folder_split=f"test", + raw_samples=total_rows, + truncated_samples=total_rows, + ) + return ds_const + + +def register_new_datasets(target = "LocalResearchGroup"): + _finemath = generate_constants(12163, 4) + add_dataset_config(f"{target}/split-finemath", _finemath) + _tulu = generate_constants(12163, 4) + add_dataset_config(f"{target}/split-tulu-3-sft-olmo-2-mixture", _tulu) + _numina = generate_constants(12163, 4) + add_dataset_config(f"{target}/split-NuminaMath-CoT", _numina) + _pythonedu = generate_constants(12163, 4) + add_dataset_config(f"{target}/split-avelina-python-edu", _pythonedu) + _glaive = generate_constants(12163, 4) + add_dataset_config(f"{target}/split-glaive-code-assistant-v3", _glaive) diff --git a/scripts/data_prep/download_repo.py b/scripts/data_prep/download_repo.py new file mode 100644 index 0000000..5242a1b --- /dev/null +++ b/scripts/data_prep/download_repo.py @@ -0,0 +1,70 @@ +from argparse import ArgumentParser, Namespace, BooleanOptionalAction +from huggingface_hub import HfApi, login +import os + + +def main(args): + api = HfApi() + datasets = { + "tulu": { + "target": f"{args.repo}/split-tulu-3-sft-olmo-2-mixture", + }, + "numina": { + "target": f"{args.repo}/split-NuminaMath-CoT", + }, + "finemath" :{ + "target": f"{args.repo}/split-finemath", + }, + "glaive" : { + "target": f"{args.repo}/split-glaive-code-assistant-v3", + }, + "avelinapythonedu": { + "target": f"{args.repo}/split-avelina-python-edu", + }, + } + + for ds in args.dataset: + ld = f"{args.out}/{ds}" + datadown = datasets[ds]["target"] + print(f"downloading {datadown=} to {ld=}\n") + local_dir = api.snapshot_download( + repo_id=datadown, + repo_type="dataset", + local_dir=ld, + ) + +def parse_args() -> Namespace: + """Parse commandline arguments.""" + parser = ArgumentParser( + description= + "Downloads tokenized versions of train/test 1M, 100k, 10k, 1k", + ) + parser.add_argument( + "--dataset", + nargs="+", + choices=["tulu", "numina", "finemath", "glaive", "avelinapythonedu"], + default=["tulu", "numina", "finemath", "glaive", "avelinapythonedu"], + ) + + parser.add_argument( + "--repo", + default="LocalResearchGroup", + help="repo containing tokenizations", + ) + + parser.add_argument( + "--out", + default=".", + help="local download folder", + ) + + parsed = parser.parse_args() + return parsed + + +if __name__ == "__main__": + args = parse_args() + if not os.environ.get("HUGGING_FACE_HUB_TOKEN"): + print("No Hugging Face token found. Please login.") + login() + main(args) \ No newline at end of file diff --git a/scripts/data_prep/preproc/__init__.py b/scripts/data_prep/preproc/__init__.py new file mode 100644 index 0000000..d9b6ae4 --- /dev/null +++ b/scripts/data_prep/preproc/__init__.py @@ -0,0 +1,9 @@ +from preproc.preprocs import pre_ml_glaive, pre_ml_tulu, pre_ml_numina +__all__ = [ + "pre_ml_glaive", + "pre_ml_tulu", + "pre_ml_numina", + # "pre_glaive", + # "pre_tulu", + # "pre_numina", +] diff --git a/scripts/data_prep/preproc/preprocs.py b/scripts/data_prep/preproc/preprocs.py new file mode 100644 index 0000000..b3b69b9 --- /dev/null +++ b/scripts/data_prep/preproc/preprocs.py @@ -0,0 +1,21 @@ +from llmfoundry.data.finetuning.tasks import ( + DatasetConstructor, +) + +dataset_constructor = DatasetConstructor() + +@dataset_constructor.register(f"LocalResearchGroup/split-tulu-3-sft-olmo-2-mixture") +def pre_ml_tulu(inp: dict): + return {"prompt": inp["prompt"], "response": inp["response"]} + + +@dataset_constructor.register(f"LocalResearchGroup/split-NuminaMath-CoT") +def pre_ml_numina(inp: dict): + return {"prompt": inp["prompt"], "response": inp["response"]} + + +@dataset_constructor.register(f"LocalResearchGroup/split-glaive-code-assistant-v3") +def pre_ml_glaive(inp: dict): + return {"prompt": inp["prompt"], "response": inp["response"]} + + diff --git a/scripts/data_prep/split_hf_datasets.py b/scripts/data_prep/split_hf_datasets.py new file mode 100644 index 0000000..f665488 --- /dev/null +++ b/scripts/data_prep/split_hf_datasets.py @@ -0,0 +1,372 @@ +from argparse import ArgumentParser, Namespace, BooleanOptionalAction + +from datasets import load_dataset, load_from_disk, DatasetDict +from huggingface_hub import HfApi, login +from pathlib import Path + +from convert_finetuning_dataset import convert_finetuning_dataset_from_args +import os + +import dataset_constants_split_config +from llmfoundry.command_utils import convert_dataset_hf_from_args + + +def save_to_parquet(combined: DatasetDict, out_ds_path: Path): + data_files = {} + for split, dataset in combined.items(): + filename = out_ds_path /f"{split}.parquet" + data_files[split] = filename + if not Path(filename).exists(): + print(f"Saving {filename}") + dataset.to_parquet(filename) + else: + print(f"{filename} already exist. Skipping...") + return data_files + +def create_size_ablation(dataset, total_rows): + """Create a subset with a given percentage of the original data""" + train_size = int(total_rows * 0.9) + return { + "train": dataset["train"].shuffle(42).select(range(train_size)), + "test": dataset["test"].shuffle(42).select(range(total_rows - train_size)), + } + + +def push_ablations(raw_datasets, ablations, hf_repo, config_name, private, shard_size): + print(f"creating ablations from {len(raw_datasets['train'])}/{len(raw_datasets['test'])}") + for label in ablations: + match label[-1]: + case "M": + ds = create_size_ablation(raw_datasets, int(label[:-1]) * 1_000_000) + case "k": + ds = create_size_ablation(raw_datasets, int(label[:-1]) * 1_000) + case _: + ds = raw_datasets + + dsdict = DatasetDict( + { + "train": ds["train"], + "test": ds["test"], + }, + ) + + print(f"\nUploading ablation {label} train/val") + + dsdict.push_to_hub(hf_repo, config_name=label, private=private, max_shard_size=shard_size) + + +def pull_n_push( + hf_ds_tgt, + hf_ds_src, + ds_name=None, + after_pull=None, + test_size: float = 0.1, + seed: int = 42, + saving2parquet=False, + ablations = ("full", "1M", "100k", "10k", "1k"), + private=True, + shard_size: str = "300MB", + purge_cache=False, +): + banner = f"Loading dataset {hf_ds_src}/{'default' if ds_name is None else ds_name}" + print("#"*len(banner)) + print(banner) + print(f"path={hf_ds_src=}, name={ds_name=}, split=train") + print("#"*len(banner)) + + dataset = load_dataset(path=hf_ds_src, name=ds_name, split="train") + if after_pull is not None: + dataset = after_pull(dataset) + dataset = dataset.train_test_split(test_size=test_size, seed=seed) + dsd = DatasetDict({"train": dataset["train"], "test": dataset["test"]}) + + if saving2parquet: + b = f"Saving parquet to {hf_ds_tgt} train/test" + print("=" * len(b)) + print(b) + print("=" * len(b)) + out_ds_path = Path(hf_ds_tgt) + out_ds_path.mkdir(parents=True, exist_ok=True) + data_files = save_to_parquet(dsd, out_ds_path.absolute()) + + push_ablations(dsd, ablations, hf_ds_tgt, ds_name, private, shard_size) + + if purge_cache: + dataset.cleanup_cache_files() + + +def preproc_chatml(inp: dict, k_prompt:str, k_response: str): + """Format dataset into ChatML template.""" + prompt = ( + "<|im_start|>system\nYou are a helpful AI assistant named SmolLM, trained by Local Research Group<|im_end|>\n" + f"<|im_start|>user\n{inp[k_prompt]}\n<|im_end|>\n" + ) + response = ( + f"<|im_start|>assistant\n{inp[k_response]}<|im_end|>\n" + "<|endoftext|>" + ) + return {"prompt": prompt, "response": response} + +def pre_ml_tulu(inp: dict): + return preproc_chatml(inp, "prompt", "response") + + +def pre_ml_numina(inp: dict): + return preproc_chatml(inp, "problem", "solution") + + +def pre_ml_glaive(inp: dict): + return preproc_chatml(inp, "question", "answer") + +def filter_tulu(dataset): + print(f"Original dataset rows {len(dataset)}") + dataset = dataset.filter(lambda r: r["source"] is not None and "aya" not in r["source"] and len(r["messages"]) == 2) + print("tulu", dataset.features) + dataset = dataset.remove_columns(["source", "dataset"]) + def extract_qa(messages): + user_question = next((msg["content"] for msg in messages if msg["role"] == "user"), None) + assistant_response = next((msg["content"] for msg in messages if msg["role"] == "assistant"), None) + return {"prompt": user_question, "response": assistant_response} + + # Apply function to dataset + dataset = dataset.map(lambda example: extract_qa(example["messages"])) + dataset = dataset.remove_columns(["messages"]) + print("new tulu features: ", dataset.features) + dataset = dataset.map(lambda example: pre_ml_tulu(example)) + print(f" current rows {len(dataset)}") + return dataset + +def process_numina(dataset): + print("numina", dataset.features) + # remove column that on batch of 512 only has 2 rows which breaks pytorch collate! + dataset = dataset.remove_columns("messages") + dataset = dataset.map(lambda example: pre_ml_numina(example)) + print("new numina features", dataset.features) + return dataset + +def process_glaive(dataset): + print("glaive", dataset.features) + + def extract_qa(messages): + return pre_ml_glaive(messages) + + dataset = dataset.map(lambda example: extract_qa(example)) + print("glaive new features:", dataset.features) + + return dataset + + +def upload_token_folder(local_path, target_repo): + print(f"upload_token_folder({str(local_path.relative_to('.'))}, {target_repo})") + api = HfApi() + r = api.upload_folder( + folder_path=local_path, + repo_id=target_repo, + repo_type="dataset", + path_in_repo=str(local_path.relative_to(".")), + ) + print(f"token uploaded result: {r}") + + +def create_pretraining_tokens(args, datasets, tokenizer="HuggingFaceTB/SmolLM2-135M"): + # import configurations to tokenize new dataset splits + max_seq_len = 8192 + for s in args.source: + d = datasets[s] + folder = d["target"].split("/")[1] + ablations = d["ablations"] if not args.one_k else ("1k",) # override ablation config from cmd line arg + for ablation in ablations: + if d["kind"] == "pretrain": + print("\nconvert_dataset_hf_from_args for", s, ablation) + convert_dataset_hf_from_args( + dataset=d["target"], + data_subset=ablation, + splits=["train", "test"], + out_root=f"tokenized/{s}/{ablation}", + compression="zstd", + concat_tokens=max_seq_len, + tokenizer=tokenizer, + tokenizer_kwargs=f'{{"model_max_length": {max_seq_len} }}', + bos_text=None, + eos_text="<|endoftext|>", + no_wrap=True, + num_workers=None, + ) + elif d["kind"] == "instruct": + print(f"\nconvert_finetuning_dataset_from_args for", s, ablation) + tokenizer="HuggingFaceTB/SmolLM2-135M-instruct" + convert_finetuning_dataset_from_args( + d["target"], + f"{ablation}", # data_subset + ["train", "test"], + d["preproc"], + [], + False, + f"tokenized/{s}/{ablation}", # out_root + None, + "zstd", + None, # num_workers + tokenizer, # tokenizer + None, + max_seq_len, # max_seq_len + "none", # target_prompts + "last", # target_responses + False, # encoder_decoder + ) + else: + raise RuntimeError(f"Unknow dataset kind: {d['kind']}") + + +def create_tokenized_upload(args, datasets): + # upload all tokenized folders to corresponding repo/folder + for s in args.source: + d = datasets[s] + ablations = d["ablations"] if not args.one_k else ("1k",) # override ablation config from cmd line arg + print(f"Uploading {ablations} from {d} to {d['target']} from {Path('.').absolute()}") + for ablation in ablations: + target_repo = d["target"] + local_path = Path(".") / f"tokenized/{s}/{ablation}" + print(f"\nUploading {ablation} to {target_repo} from {str(local_path)}\n") + upload_token_folder(local_path, target_repo) + print("upload finished.") + +def upload_splits(args, datas): + for arg in args.source: + d = datas[arg] + ds_name = d.get("ds_name", None) + ablations = d["ablations"] if not args.one_k else ("1k",) # override ablation config from cmd line arg + pull_n_push( + d["target"], + d["src"], + ds_name=ds_name, + ablations=ablations, + after_pull=d.get("after_pull", None), + ) + + +def main(args): + datasets = { + "tulu": { + "src": "allenai/tulu-3-sft-olmo-2-mixture", + "target": f"{args.target_repo}/split-tulu-3-sft-olmo-2-mixture", + "after_pull": filter_tulu, + "ablations": ("full", "100k", "10k", "1k"), + "preproc":"preproc:pre_ml_tulu", + "kind": "instruct", + }, + "numina": { + "src": "AI-MO/NuminaMath-CoT", + "target": f"{args.target_repo}/split-NuminaMath-CoT", + "after_pull": process_numina, + "ablations": ("full", "100k", "10k", "1k"), + "preproc":"preproc:pre_ml_numina", + "kind": "instruct", + }, + "glaive": { + "src": "glaiveai/glaive-code-assistant-v3", + "after_pull": process_glaive, + "target": f"{args.target_repo}/split-glaive-code-assistant-v3", + "ablations": ("full", "100k", "10k", "1k"), + "preproc":"preproc:pre_ml_glaive", + "kind": "instruct", + }, + "finemath" :{ + "src": "HuggingFaceTB/finemath", + "ds_name": "finemath-4plus", + "target": f"{args.target_repo}/split-finemath", + "ablations": ("full", "1M", "100k", "10k", "1k"), + "kind": "pretrain", + }, + "avelinapythonedu": { + "src": "Avelina/python-edu", + "target": f"{args.target_repo}/split-avelina-python-edu", + "ablations": ("full", "1M", "100k", "10k", "1k"), + "kind": "pretrain", + }, + } + dataset_constants_split_config.register_new_datasets(args.target_repo) + if args.split: + print(f"spliting: {args.source}") + d = upload_splits(args, datasets) + print(f"spliting: {args.source} finished.") + if args.tokenize_local: + print(f"tokenizing: {args.source}") + create_pretraining_tokens(args, datasets) + print(f"tokenizing: {args.source} finished.") + if args.upload_tokens: + print(f"uploading tokens: {args.source}") + create_tokenized_upload(args, datasets) + print(f"uploading tokens: {args.source} finished.") + + +def parse_args() -> Namespace: + """Parse commandline arguments.""" + parser = ArgumentParser( + description="""Tool to help build splits, tokenize and upload tokens. + + 1. -split Split `source` dataset to train/test 1M, 100k, 10k, 1k and upload it to `target_repo` (default LRG@hf) + 2. ---tokenize-local the splits locally to tokenized folder + 3. --upload-tokens upload local tokens to target repo + + + python data_prep/split_hf_datasets.py --source avelinapythonedu --split --no-tokenize-local --no-upload-tokens + python data_prep/split_hf_datasets.py --source avelinapythonedu --no-split --tokenize-local --no-upload-tokens + python data_prep/split_hf_datasets.py --source avelinapythonedu --no-split --no-tokenize-local --upload-tokens + + add `--one-k` to target only 1k rows split + """, + ) + parser.add_argument( + "--source", + nargs="+", + choices=[ + "tulu", + "numina", + "glaive", + "finemath", + "avelinapythonedu", + ], + default=["tulu", "numina", "glaive", "finemath", "avelinapythonedu"], + ) + + parser.add_argument( + "--target_repo", + default="LocalResearchGroup", + help="target repo to upload splits and tokenizations default is `LocalResearchGroup`", + ) + + parser.add_argument( + "--split", + action=BooleanOptionalAction, + default=True, + help="Make splits out of source datasets", + ) + parser.add_argument( + "--tokenize-local", + action=BooleanOptionalAction, + default=True, + help="generate local tokenization for splits", + ) + parser.add_argument( + "--upload-tokens", + action=BooleanOptionalAction, + default=True, + help="upload local tokenization to target repo", + ) + parser.add_argument( + "--one-k", + action=BooleanOptionalAction, + default=False, + help="for testing/checks only process 1k split", + ) + + parsed = parser.parse_args() + return parsed + + +if __name__ == "__main__": + args = parse_args() + if not os.environ.get("HUGGING_FACE_HUB_TOKEN"): + print("No Hugging Face token found. Please login.") + login() + main(args) diff --git a/scripts/modal/modal_script.py b/scripts/modal/modal_script.py index 32cbbdc..88affca 100644 --- a/scripts/modal/modal_script.py +++ b/scripts/modal/modal_script.py @@ -35,7 +35,7 @@ image = image.add_local_file(TRAIN_YAML, f"/llm-foundry/scripts/train/yamls/finetune/{TRAIN_YAML}") @app.function(gpu=TRAINING_GPU, image=image, timeout=3600, secrets=[Secret.from_name("LRG")], - max_containers=1) + concurrency_limit=1) def get_stats(): import subprocess @@ -58,7 +58,7 @@ def get_stats(): @app.function(gpu=TRAINING_GPU, image=image, timeout=3600, secrets=[Secret.from_name("LRG")], volumes={DATASETS_VOLUME_MOUNT_PATH: DATASETS_VOLUME}, - max_containers=1) + concurrency_limit=1) def convert_c4_small_dataset(): import subprocess import os @@ -127,7 +127,7 @@ def convert_finetuning_dataset(): @app.function(gpu=TRAINING_GPU, image=image, timeout=3600, secrets=[Secret.from_name("LRG")], volumes={MODEL_CHECKPOINT_VOLUME_MOUNT_PATH: MODEL_CHECKPOINT_VOLUME}, - max_containers=1) + concurrency_limit=1) def view_model_checkpoints(save_folder: str=None): import os print("\nModel checkpoint files and sizes:") @@ -214,7 +214,7 @@ def run_aim_server(run_folder: str): @app.function(gpu=TRAINING_GPU, image=image, timeout=12*3600, secrets=[Secret.from_name("LRG")], volumes={MODEL_CHECKPOINT_VOLUME_MOUNT_PATH: MODEL_CHECKPOINT_VOLUME, DATASETS_VOLUME_MOUNT_PATH: DATASETS_VOLUME}, - max_containers=1) + concurrency_limit=1) def train_with_aim(run_ts: str, yaml_path: str = "train/yamls/pretrain/smollm2-135m.yaml"): import subprocess, time @@ -276,7 +276,7 @@ def convert_model_to_hf(checkpoint_path: str, yaml_path: str = "", upload_to_hf: @app.function(gpu=TRAINING_GPU, image=image, timeout=3600, secrets=[Secret.from_name("LRG")], volumes={MODEL_CHECKPOINT_VOLUME_MOUNT_PATH: MODEL_CHECKPOINT_VOLUME}, - max_containers=1) + concurrency_limit=1) def evaluate_model(checkpoint_path: str): import subprocess, os from pathlib import Path @@ -307,7 +307,7 @@ def evaluate_model(checkpoint_path: str): @app.function(gpu=TRAINING_GPU, image=image, timeout=3600, secrets=[Secret.from_name("LRG")], volumes={MODEL_CHECKPOINT_VOLUME_MOUNT_PATH: MODEL_CHECKPOINT_VOLUME}, - max_containers=1) + concurrency_limit=1) def generate_responses(checkpoint_path: str, prompts: list[str]|str|None=None): import subprocess, os from pathlib import Path @@ -343,7 +343,7 @@ def generate_responses(checkpoint_path: str, prompts: list[str]|str|None=None): @app.function(gpu=TRAINING_GPU, image=image, timeout=3600, secrets=[Secret.from_name("LRG")], volumes={MODEL_CHECKPOINT_VOLUME_MOUNT_PATH: MODEL_CHECKPOINT_VOLUME}, - max_containers=1) + concurrency_limit=1) def push_folder_to_hf(folder_path: str, repo_id: str | None = None, repo_type: str = "model", private: bool = True): """Upload model checkpoint to HuggingFace Hub.""" from huggingface_hub import HfApi @@ -411,6 +411,57 @@ def process_datasets(): if result.stderr: print("Process dataset errors:", result.stderr) +@app.function(gpu=TRAINING_GPU, image=image, timeout=3600, secrets=[Secret.from_name("LRG")], + volumes={DATASETS_VOLUME_MOUNT_PATH: DATASETS_VOLUME}, + concurrency_limit=1) +def pull_hf_to_folder(): + import subprocess + import os + + # Change to llm-foundry/scripts directory at the start + os.chdir("/llm-foundry/scripts") + print(f"Working directory: {os.getcwd()}") + + # Step 1: pull all tokens + print(f"Downloading repos to {DATASETS_VOLUME_MOUNT_PATH}/") + data_prep_cmd = [ + PYTHON_PATH, # Use the correct Python interpreter + "data_prep/download_repo.py", + "--out", f"{DATASETS_VOLUME_MOUNT_PATH}/", + ] + result = subprocess.run(data_prep_cmd, capture_output=True, text=True) + print(result.stdout) + if result.stderr: + print("Download data errors:", result.stderr) + + DATASETS_VOLUME.commit() + +@app.function(gpu=TRAINING_GPU, image=image, timeout=4*3600, secrets=[Secret.from_name("LRG")], + concurrency_limit=1) +def process_datasets(): + import subprocess + import os + + # Change to llm-foundry/scripts directory at the start + os.chdir("/llm-foundry/scripts") + print(f"Working directory: {os.getcwd()}") + + # process all datasets: tulu, numina, finemath, glaive, avelinapythonedu + # 1. pull original, split and upload splits (`--no-split` to skip) + # 2. tokenize dataset(s) (`--no-tokenize` to skip) + # 3. upload (tokenized) folders (`--no-upload` to skip) + # `--source` can be 1 or the allowed list (dont pass to process all registered datasets) + print(f"Processing datasets...") + data_prep_cmd = [ + PYTHON_PATH, # Use the correct Python interpreter + "data_prep/split_hf_datasets.py", + ] + result = subprocess.run(data_prep_cmd, capture_output=True, text=True) + print(result.stdout) + if result.stderr: + print("Process dataset errors:", result.stderr) + + @app.local_entrypoint() def main(): from pathlib import Path