Skip to content

adding integration with fsspec FileSystem#1126

Open
artemru wants to merge 46 commits intomainfrom
adding_fsspec
Open

adding integration with fsspec FileSystem#1126
artemru wants to merge 46 commits intomainfrom
adding_fsspec

Conversation

@artemru
Copy link
Contributor

@artemru artemru commented Apr 18, 2025

What does this PR do? Please describe:

This PR allows to save and reload the checkpoints from an remote location using fsspec integration!

Wrapping the standard fsspec interface to fs2 FileSystem API

  • Using Wrapped FS instead of Native one
  • Adding some basic path resolvers (they should be used in dataset loading and model registry)
  • Adding some basic configurations for s3, hf, gcp
  • adding cat method (for possible faster files reading in concrete fsspec implementations)
  • chaning is_local to is_local_path
  • introducing checkpoint-dir support in CLI
  • fixing moving files logic for hf and normal checkpoints (s3 cannot do move operation as on nfs)

Tested with e2e example "

python -m recipes.lm.sft "/tmp/tmp/fs2_s3test_1" --checkpoint-dir s3://bucket/folder/ \
  --config-file recipes/lm/sft/configs/llama3_2_1b_instruct_gsm8k.yaml \
  --config regime.checkpoint_every_n_steps=200

with restarting as well!

Does your PR introduce any breaking changes? If yes, please list them:
List of all backwards-incompatible changes.

Check list:

  • Was the content of this PR discussed and approved via a GitHub issue? (no need for typos or documentation improvements)
  • Did you read the contributor guideline?
  • Did you make sure that your PR does only one thing instead of bundling different changes together?
  • Did you make sure to update the documentation with your changes? (if necessary)
  • Did you write any new necessary tests?
  • Did you verify new and existing tests pass locally with your changes?
  • Did you update the CHANGELOG? (no need for typos, documentation, or minor internal changes)

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Apr 18, 2025
@artemru artemru self-assigned this Apr 18, 2025
@artemru artemru marked this pull request as ready for review April 23, 2025 13:10
@artemru artemru requested a review from cbalioglu as a code owner April 23, 2025 13:10
def open(self, path: Path, mode: FileMode = FileMode.READ) -> BinaryIO: ...

@abstractmethod
def cat(self, path: Path) -> bytes:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's useful for faster files downloading

Copy link
Contributor

@cbalioglu cbalioglu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe one major thing missing right now is some benchmark numbers as well as verification that the new file system implementation works end-to-end (i.e. a real world training run, asset metadata loading, Hugging Face model exports, and any other place where we used regular file system calls work without any regressions). Have you been able to verify those?

Asset download manager for local files and fsspec-compatible URI schemes.

For local files (file:// scheme), it returns the local path directly.
For fsspec schemes (s3://, gs://, etc.), it returns the URI as a Path,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really sure whether we should treat schemes beyond file:// as "local" in a download manager. It defeats the purpose of download manager if we do not download and cache a remote file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the exact purpose of that class btw ?
or it's a pure naming issue LocalAssetDownloadManager -> GlobalAssetDownloadManager ?

if not self._file_system.is_local:
return

def _sync_distributed_writes(self) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole function can become noop in case the file system is not local (I assume fsspec guarantees some form of strong consistency, if not, this function will likely become tricky). We do not need a redundant barrier call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just wanted to confirm that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i think s3 should get some transactional consistency indeed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@caciolai caciolai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor comments

@artemru
Copy link
Contributor Author

artemru commented Jan 26, 2026

I believe one major thing missing right now is some benchmark numbers as well as verification that the new file system implementation works end-to-end (i.e. a real world training run, asset metadata loading, Hugging Face model exports, and any other place where we used regular file system calls work without any regressions). Have you been able to verify those?

i verified and continue doing so on real run where the chkpt happening on s3.
is there any e2e workflow that I can reuse for tests that checks that all saved primitives are correct ?

@artemru
Copy link
Contributor Author

artemru commented Jan 26, 2026

i need to move checkpoints/model.yaml to the remove location as well...
Ok, done, model def + hg models :

now we've locally

0       /tmp/tmp/fs2_s3test_12/ws_1.d2b3ae4f/cache
4.0K    /tmp/tmp/fs2_s3test_12/ws_1.d2b3ae4f/config.yaml
192K    /tmp/tmp/fs2_s3test_12/ws_1.d2b3ae4f/logs
212K    /tmp/tmp/fs2_s3test_12/ws_1.d2b3ae4f/metrics
312K    /tmp/tmp/fs2_s3test_12/ws_1.d2b3ae4f/tb

and in s3 folder:
model.yaml step_1000 step_1200 step_1400 step_1600 step_1800 step_2000 step_2200 step_2400 step_2600 step_800
with inner structure:

8.8M    fs2_s3test_12/ws_1.d2b3ae4f/step_1200/data_reader
4.7G    fs2_s3test_12/ws_1.d2b3ae4f/step_1200/hg
4.0K    fs2_s3test_12/ws_1.d2b3ae4f/step_1200/hg.run
4.0K    fs2_s3test_12/ws_1.d2b3ae4f/step_1200/hg.stderr
0       fs2_s3test_12/ws_1.d2b3ae4f/step_1200/hg.stdout
5.6G    fs2_s3test_12/ws_1.d2b3ae4f/step_1200/model
9.3G    fs2_s3test_12/ws_1.d2b3ae4f/step_1200/optimizer
12K     fs2_s3test_12/ws_1.d2b3ae4f/step_1200/trainer

@artemru
Copy link
Contributor Author

artemru commented Jan 26, 2026

the issue is that s3fs removes folder content but does not remove the folders themselves ...



@final
class CheckpointDir:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the right place for this dubby holder class ? maybe we can even remove it

@artemru
Copy link
Contributor Author

artemru commented Jan 26, 2026

@cbalioglu : what tests should run to make sure that it's ok ?

@artemru
Copy link
Contributor Author

artemru commented Jan 26, 2026

ok. catching up hg models folder for copy

cmd, stdout=out_fp, stderr=err_fp
)
else:
result = self._process_runner.run_text(cmd, capture_output=True)
Copy link
Contributor Author

@artemru artemru Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally speaking, we cannot pass self._file_system into subprocess, i dont know that is the best option here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i cannot pass env={} here become varenv can be useful to reinit s3 filesystem from aws related varenvs

def remove_directory(self, path: Path) -> None:
# For S3/remote filesystems, rmdir only removes empty directories
# Use rm with recursive=True to delete directory and all contents
self._fsspec.rm(self.get_short_uri(path), recursive=True)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's an issue with phantoms s3 directories that cannot be removed...

fp = self._file_system.open_text(run_id_file)

return fp.read()
return self._file_system.cat(run_id_file).decode("utf-8")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i still dont understand why we have filesystem abstraction here if we cannot really use other remote fs for the metrics logging...

metadata_dumper: ModelMetadataDumper,
output_dir: Path,
gangs: Gangs,
checkpoint_dir: Path | None = None,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont know where else we need to propagate checkpoint_dir ...

)

result = self._process_runner.run_text(
cmd, stdout=out_fp, stderr=err_fp, env={}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you know why env={} was necessary here ?

@artemru
Copy link
Contributor Author

artemru commented Feb 6, 2026

some rebase issues - fixing them

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants