Conversation
| def open(self, path: Path, mode: FileMode = FileMode.READ) -> BinaryIO: ... | ||
|
|
||
| @abstractmethod | ||
| def cat(self, path: Path) -> bytes: |
There was a problem hiding this comment.
it's useful for faster files downloading
cbalioglu
left a comment
There was a problem hiding this comment.
I believe one major thing missing right now is some benchmark numbers as well as verification that the new file system implementation works end-to-end (i.e. a real world training run, asset metadata loading, Hugging Face model exports, and any other place where we used regular file system calls work without any regressions). Have you been able to verify those?
| Asset download manager for local files and fsspec-compatible URI schemes. | ||
|
|
||
| For local files (file:// scheme), it returns the local path directly. | ||
| For fsspec schemes (s3://, gs://, etc.), it returns the URI as a Path, |
There was a problem hiding this comment.
Not really sure whether we should treat schemes beyond file:// as "local" in a download manager. It defeats the purpose of download manager if we do not download and cache a remote file.
There was a problem hiding this comment.
what is the exact purpose of that class btw ?
or it's a pure naming issue LocalAssetDownloadManager -> GlobalAssetDownloadManager ?
src/fairseq2/checkpoint/manager.py
Outdated
| if not self._file_system.is_local: | ||
| return | ||
|
|
||
| def _sync_distributed_writes(self) -> None: |
There was a problem hiding this comment.
This whole function can become noop in case the file system is not local (I assume fsspec guarantees some form of strong consistency, if not, this function will likely become tricky). We do not need a redundant barrier call.
There was a problem hiding this comment.
just wanted to confirm that
There was a problem hiding this comment.
yes, i think s3 should get some transactional consistency indeed
There was a problem hiding this comment.
i verified and continue doing so on real run where the chkpt happening on s3. |
|
i need to move checkpoints/model.yaml to the remove location as well... now we've locally and in s3 folder: 8.8M fs2_s3test_12/ws_1.d2b3ae4f/step_1200/data_reader
4.7G fs2_s3test_12/ws_1.d2b3ae4f/step_1200/hg
4.0K fs2_s3test_12/ws_1.d2b3ae4f/step_1200/hg.run
4.0K fs2_s3test_12/ws_1.d2b3ae4f/step_1200/hg.stderr
0 fs2_s3test_12/ws_1.d2b3ae4f/step_1200/hg.stdout
5.6G fs2_s3test_12/ws_1.d2b3ae4f/step_1200/model
9.3G fs2_s3test_12/ws_1.d2b3ae4f/step_1200/optimizer
12K fs2_s3test_12/ws_1.d2b3ae4f/step_1200/trainer |
|
the issue is that s3fs removes folder content but does not remove the folders themselves ... |
|
|
||
|
|
||
| @final | ||
| class CheckpointDir: |
There was a problem hiding this comment.
what is the right place for this dubby holder class ? maybe we can even remove it
|
@cbalioglu : what tests should run to make sure that it's ok ? |
|
ok. catching up hg models folder for copy |
| cmd, stdout=out_fp, stderr=err_fp | ||
| ) | ||
| else: | ||
| result = self._process_runner.run_text(cmd, capture_output=True) |
There was a problem hiding this comment.
generally speaking, we cannot pass self._file_system into subprocess, i dont know that is the best option here ?
There was a problem hiding this comment.
i cannot pass env={} here become varenv can be useful to reinit s3 filesystem from aws related varenvs
| def remove_directory(self, path: Path) -> None: | ||
| # For S3/remote filesystems, rmdir only removes empty directories | ||
| # Use rm with recursive=True to delete directory and all contents | ||
| self._fsspec.rm(self.get_short_uri(path), recursive=True) |
There was a problem hiding this comment.
there's an issue with phantoms s3 directories that cannot be removed...
| fp = self._file_system.open_text(run_id_file) | ||
|
|
||
| return fp.read() | ||
| return self._file_system.cat(run_id_file).decode("utf-8") |
There was a problem hiding this comment.
i still dont understand why we have filesystem abstraction here if we cannot really use other remote fs for the metrics logging...
| metadata_dumper: ModelMetadataDumper, | ||
| output_dir: Path, | ||
| gangs: Gangs, | ||
| checkpoint_dir: Path | None = None, |
There was a problem hiding this comment.
i dont know where else we need to propagate checkpoint_dir ...
| ) | ||
|
|
||
| result = self._process_runner.run_text( | ||
| cmd, stdout=out_fp, stderr=err_fp, env={} |
There was a problem hiding this comment.
do you know why env={} was necessary here ?
9a06bc7 to
1d0203f
Compare
|
some rebase issues - fixing them |
What does this PR do? Please describe:
This PR allows to save and reload the checkpoints from an remote location using fsspec integration!
Wrapping the standard fsspec interface to fs2 FileSystem API
checkpoint-dirsupport in CLITested with e2e example "
python -m recipes.lm.sft "/tmp/tmp/fs2_s3test_1" --checkpoint-dir s3://bucket/folder/ \ --config-file recipes/lm/sft/configs/llama3_2_1b_instruct_gsm8k.yaml \ --config regime.checkpoint_every_n_steps=200with restarting as well!
Does your PR introduce any breaking changes? If yes, please list them:
List of all backwards-incompatible changes.
Check list: