feat: support for flux framework as hpc manager#3188
feat: support for flux framework as hpc manager#3188vsoch wants to merge 7 commits intokubeflow:masterfrom
Conversation
Pull Request Test Coverage Report for Build 21810351691Details
💛 - Coveralls |
There was a problem hiding this comment.
Pull request overview
This PR adds Flux Framework as a new Kubeflow Trainer runtime plugin and extends the Trainer APIs/CRDs so Flux-specific ML policy can be configured via TrainingRuntime/ClusterTrainingRuntime, along with example manifests.
Changes:
- Register a new
fluxframework plugin and integrate it into the runtime framework plugin pipeline. - Extend the MLPolicy API surface (Go types, CRDs, OpenAPI, Python client) with
flux/FluxMLPolicySource(currentlynumProcPerNode). - Add Flux plugin implementation + unit tests and provide example runtime + TrainJob YAMLs.
Reviewed changes
Copilot reviewed 25 out of 25 changed files in this pull request and generated 13 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/runtime/runtime.go | Extends runtime policy/container abstractions to carry Flux policy + image/command fields. |
| pkg/runtime/framework/plugins/registry.go | Registers the Flux plugin in the framework plugin registry. |
| pkg/runtime/framework/plugins/jobset/jobset.go | Propagates runtime PodSet container Command/Image into JobSet apply spec. |
| pkg/runtime/framework/plugins/flux/flux.go | New Flux plugin: enforces Flux behavior, generates ConfigMap scripts, creates CURVE Secret, adds watches. |
| pkg/runtime/framework/plugins/flux/flux_test.go | New Flux plugin tests. |
| pkg/runtime/framework/core/framework_test.go | Updates framework tests to include Flux plugin in expected plugin sets. |
| pkg/client/applyconfiguration/utils.go | Adds applyconfiguration kind mapping for FluxMLPolicySource. |
| pkg/client/applyconfiguration/trainer/v1alpha1/mlpolicysource.go | Adds Flux field + builder method to MLPolicySource apply config. |
| pkg/client/applyconfiguration/trainer/v1alpha1/mlpolicy.go | Adds Flux builder method to MLPolicy apply config. |
| pkg/client/applyconfiguration/trainer/v1alpha1/fluxmlpolicysource.go | Generated apply config for FluxMLPolicySource. |
| pkg/apis/trainer/v1alpha1/* | Adds FluxMLPolicySource to Go API types + generated deepcopy/openapi. |
| manifests/base/crds/* | CRD schema updates to include mlPolicy.flux. |
| charts/kubeflow-trainer/crds/* | Helm CRD schema updates to include mlPolicy.flux. |
| api/python_api/kubeflow_trainer_api/models/* | Python client model updates for Flux ML policy. |
| api/openapi-spec/swagger.json | OpenAPI schema updates for FluxMLPolicySource. |
| examples/flux/* | Adds example Flux runtime + TrainJob manifests (batch + interactive). |
Comments suppressed due to low confidence (1)
pkg/runtime/runtime.go:87
runtime.ContainergainedImageandCommandfields, buttoPodSetContainernever populates them from the runtime PodSpec apply configs, so these values will always be empty unless manually set later; update the conversion to copy image/command fromcorev1ac.ContainerApplyConfiguration.
type Container struct {
Name string
Image string
Command []string
Env []corev1ac.EnvVarApplyConfiguration
Ports []corev1ac.ContainerPortApplyConfiguration
VolumeMounts []corev1ac.VolumeMountApplyConfiguration
}
| node_spec="-n2" | ||
| node_spec="${node_spec}" | ||
| flags="${node_spec} " |
There was a problem hiding this comment.
The Flux submit flags are hard-coded (node_spec="-n2") and ignore the configured fluxPolicy.NumProcPerNode and the TrainJob node count, so most jobs will run with the wrong parallelism; derive -n/-N (or equivalent) from NumProcPerNode and trainJob.Spec.Trainer.NumNodes.
| node_spec="-n2" | |
| node_spec="${node_spec}" | |
| flags="${node_spec} " | |
| # Derive node and task counts from environment when available. | |
| # FLUX_NUM_NODES is expected to be the TrainJob node count and | |
| # FLUX_NUM_PROC_PER_NODE is expected to be fluxPolicy.NumProcPerNode. | |
| if [[ -n "${FLUX_NUM_NODES}" ]] && [[ -n "${FLUX_NUM_PROC_PER_NODE}" ]]; then | |
| total_tasks=$((FLUX_NUM_NODES * FLUX_NUM_PROC_PER_NODE)) | |
| node_spec="-N${FLUX_NUM_NODES} -n${total_tasks}" | |
| elif [[ -n "${FLUX_NUM_NODES}" ]]; then | |
| node_spec="-N${FLUX_NUM_NODES}" | |
| elif [[ -n "${FLUX_NUM_PROC_PER_NODE}" ]]; then | |
| node_spec="-n${FLUX_NUM_PROC_PER_NODE}" | |
| else | |
| # Fallback to the previous hard-coded behavior. | |
| node_spec="-n2" | |
| fi | |
| flags="${node_spec}" |
There was a problem hiding this comment.
This is a bug (oversight) converting from the previous implementation. I'll update this to have tasks == numProcPerNode. The main issue here is that the number of tasks for the job may not necessarily equate to the number of processes per node, but for most Trainer examples I've seen, this is what is expected. We also are not properly supporting GPUs. For that, flux needs to have a -g flag, and that corresponds to GPUs per task.
@andreyvelich how should we address that? E.g., if we have a single node with 8 GPU and we want the entire pod to consume the entire node (and all gpu) we would do -g 8. E.g., "two nodes, each has 8 cores and each of the 8 cores is assigned to 1 gpu"
flux run -N2 -n 8 -g 1 /opt/multi-gpu-programming-models/mpi/jacobi -niter 5000Here is a more realistic (complex) example.
flux run --cores-per-task 1 --env OMP_NUM_THREADS=1 -N16 -n 128 -g 1 -o gpu-affinity=per-task kripke --arch CUDA --layout GDZ --dset 8 --zones 128,128,128 --gset 16 --groups 64 --niter 50 --legendre 8 --quad 8 --procs 4,8,4
doneIn the above, we want 16 physical nodes, each of those has 8 GPU, so a total of 128 GPU across the job. That means that each "slot" or "task" gets one gpu (-g 1) and just one core (--cores-per-task). The above does not say that each physical node has 128 "numProcPerNode" and it isn't clear how I'd specify this to run with the Kubeflow Trainer right now. Let's discuss.
There are other examples in that README if needed.
There was a problem hiding this comment.
Can we start with simple assignment that -n == .trainer.numProcPerNode * trainer.numNodes if it is set in TrainJob, otherwise
-n == .flux.numProcPerNode * .numNodes?
There was a problem hiding this comment.
@andreyvelich another thing I'm thinking is that there are cases when we want to leave out -n and just define -N (numNodes). I think tasks here is required to minimally be 1? Is this something we can support another way?
There was a problem hiding this comment.
How is this handled if it can be a string? Is there a common function?
trainer/pkg/apis/trainer/v1alpha1/trainingruntime_types.go
Lines 204 to 212 in 54eab65
There was a problem hiding this comment.
@andreyvelich I just pushed a change that better covers these cases, but I need some help understanding the possible string definitions for the Trainer numProcPerNode. With "auto" we can at least support GPU - with flux when you do -N <n> and then --exclusive that essentially says "give me all the resources (cpu) on the node." What it's clear is how that is different from cpu, and then what "gpu" means. For Flux, to enable GPU we do need to set the -g (gpus per task) flag, which requires knowing the number on the node and of course the number requested by the user per task.
There was a problem hiding this comment.
I think tasks here is required to minimally be 1? Is this something we can support another way?
Yes, we can set numProcPerNode to 1 by default.
There was a problem hiding this comment.
How is this handled if it can be a string? Is there a common function?
We are going to refactor this as part of this PR: #3239
We are going to accept only int values.
There was a problem hiding this comment.
For Flux, to enable GPU we do need to set the -g (gpus per task) flag, which requires knowing the number on the node and of course the number requested by the user per task.
Let's not worry about auto or other values in numProcPerNode, it is specific to PyTorch.
@vsoch For now, can we just check the container resources to automatically configure the appropriate command for Flux?
For example, if GPU is requested, we set -g in the command.
| foundroot=$(find $viewroot -maxdepth 2 -type d -path $viewroot/lib/python3\*) > /dev/null 2>&1 | ||
| pythonversion=$(basename ${foundroot}) | ||
| pythonversion=${viewroot}/bin/${pythonversion} |
There was a problem hiding this comment.
foundroot=$(find ... ) > /dev/null 2>&1 redirects the assignment output rather than silencing find, and also loses the actual path(s); capture find output properly (and pick one directory) so pythonversion/PYTHONPATH are computed correctly.
| foundroot=$(find $viewroot -maxdepth 2 -type d -path $viewroot/lib/python3\*) > /dev/null 2>&1 | |
| pythonversion=$(basename ${foundroot}) | |
| pythonversion=${viewroot}/bin/${pythonversion} | |
| foundroot=$(find "$viewroot" -maxdepth 2 -type d -path "$viewroot"/lib/python3\* 2>/dev/null | head -n 1) | |
| pythonversion=$(basename "${foundroot}") | |
| pythonversion="${viewroot}/bin/${pythonversion}" |
| // buildCurveSecret generates a cluster wide curve certificate for flux | ||
| func (f *Flux) buildCurveSecret(trainJob *trainer.TrainJob) (*corev1ac.SecretApplyConfiguration, error) { | ||
| // Generate a deterministic Secret Key from the UID | ||
| secretSeed := sha256.Sum256([]byte(trainJob.UID)) | ||
|
|
||
| // Derive the Public Key using standard X25519 (CURVE25519) | ||
| // ZeroMQ/Flux uses X25519. | ||
| priv, err := ecdh.X25519().NewPrivateKey(secretSeed[:]) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to create curve private key: %w", err) | ||
| } | ||
| pub := priv.PublicKey() | ||
|
|
||
| // Encode both to Z85 (40 characters each) | ||
| z85Secret := encodeZ85(priv.Bytes()) | ||
| z85Public := encodeZ85(pub.Bytes()) |
There was a problem hiding this comment.
buildCurveSecret derives the CURVE secret key deterministically from the TrainJob UID, making it predictable to anyone who can read object metadata; generate a cryptographically random key once (store it in the Secret) and re-use it on subsequent reconciles (similar to the MPI SSH secret pattern).
| err := p.(framework.EnforceMLPolicyPlugin).EnforceMLPolicy(tc.info, tc.trainJob) | ||
| if err != nil { | ||
| t.Fatalf("EnforceMLPolicy failed: %v", err) | ||
| } | ||
|
|
||
| if tc.info.RuntimePolicy.FluxPolicySource != nil && tc.info.TemplateSpec.ObjApply != nil { | ||
| js := tc.info.TemplateSpec.ObjApply.(*v1alpha2.JobSetSpecApplyConfiguration) | ||
| for _, rj := range js.ReplicatedJobs { | ||
| if ptr.Deref(rj.Name, "") == constants.Node { | ||
| podSpec := rj.Template.Spec.Template.Spec | ||
| var icNames []string | ||
| for _, ic := range podSpec.InitContainers { | ||
| icNames = append(icNames, ptr.Deref(ic.Name, "")) | ||
| } | ||
| if diff := gocmp.Diff(tc.wantInitContainers, icNames); len(diff) != 0 { | ||
| t.Errorf("Unexpected init containers (-want, +got): %s", diff) | ||
| } | ||
| for _, c := range podSpec.Containers { | ||
| if ptr.Deref(c.Name, "") == constants.Node { | ||
| if diff := gocmp.Diff(tc.wantCommand, c.Command); len(diff) != 0 { | ||
| t.Errorf("Unexpected command (-want, +got): %s", diff) | ||
| } | ||
| if ptr.Deref(c.TTY, false) != tc.wantTTY { | ||
| t.Errorf("Expected TTY %v, got %v", tc.wantTTY, ptr.Deref(c.TTY, false)) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Tests gate Flux assertions on tc.info.RuntimePolicy.FluxPolicySource, but the plugin checks RuntimePolicy.MLPolicySource.Flux, so the assertions are skipped and the test can pass without verifying mutations; update tests to use MLPolicySource.Flux and remove the unused FluxPolicySource field references.
| jobSetSpec.ReplicatedJobs[psIdx].Template.Spec.Template.Spec.Containers[containerIdx].Command = container.Command | ||
| } | ||
| if container.Image != "" { | ||
| jobSetSpec.ReplicatedJobs[psIdx].Template.Spec.Template.Spec.Containers[containerIdx].Image = &container.Image |
There was a problem hiding this comment.
In JobSet Build, assigning Image = &container.Image takes the address of the range loop variable field, so multiple containers can end up sharing the same pointer value; use a stable pointer (e.g., ptr.To(container.Image)) instead.
| jobSetSpec.ReplicatedJobs[psIdx].Template.Spec.Template.Spec.Containers[containerIdx].Image = &container.Image | |
| jobSetSpec.ReplicatedJobs[psIdx].Template.Spec.Template.Spec.Containers[containerIdx].Image = ptr.To(container.Image) |
There was a problem hiding this comment.
@andreyvelich would you like this change? I don't see why it would be an issue. It might even be beneficial if the value changes somewhere else (and then is changed here). It could also be irrelevant.
There was a problem hiding this comment.
I think, it should be fine to keep it with &
@astefanutti @tenzen-y Any recommendations?
|
|
||
| from __future__ import annotations | ||
| import pprint | ||
| import re # noqa: F401 |
There was a problem hiding this comment.
Import of 're' is not used.
| import re # noqa: F401 |
| trainJob.Annotations[AnnotationOriginalCommand] = originalCmd | ||
| trainJob.Annotations[AnnotationViewImage] = settings["FLUX_VIEW_IMAGE"] |
There was a problem hiding this comment.
I might be missing something, but why do you place these values to the TrainJob annotations?
There was a problem hiding this comment.
The settings are global (and would be shared between instances) and annotations persist between creations. It also allows programmatic understanding of the creation by other Kubernetes controllers or the user.
There was a problem hiding this comment.
Users can always check the actual JobSet to see the FLUX_VIEW_IMAGE and the original command, since we wrap it.
I would suggest to avoid using annotations to passing some state between the objects unless it is really necessary. cc @tenzen-y @astefanutti
There was a problem hiding this comment.
@andreyvelich I can try to remove annotations, but we need somewhere to put the original command. It will not persist with the train job after we update it. The annotation served as a place to put it.
There was a problem hiding this comment.
@vsoch Any specific reason to preserve the original command?
Do users require it for observability?
There was a problem hiding this comment.
We need the original command to run their workflow in the MiniCluster. It gets replaced with an execution to the entrypoint. The fallback to put them both in one place would be to prefix the original command with the entrypoint, and then receive it as whatever arguments come in to the script.
There was a problem hiding this comment.
@andreyvelich I just pushed a change that gets rid of the need to set annotations. We have the command as a suffix to the flux entrypoint, which captures it via all the args.
There was a problem hiding this comment.
It's unclear to me at the moment how MiniCluster will be used with TrainJob, so we can discuss it after initial implementation.
We have the command as a suffix to the flux entrypoint, which captures it via all the args.
That sounds good, thanks.
There was a problem hiding this comment.
Specifically, there is no MiniCluster here (that is a Flux Operator CRD) and we aren't using the Flux Operator here. We can definitely discuss how Flu can be used in the context of TrainJob, looking forward to it!
|
@vsoch If you rebase your PR, it should fix the E2Es. |
114ae38 to
57805da
Compare
Done! Hopefully didn't bork anything - I'll watch for issues. |
andreyvelich
left a comment
There was a problem hiding this comment.
Thanks for the updates @vsoch! I left a few small comment.
It looks great, I think we should be ready to merge it.
/assign @astefanutti @tenzen-y
In case you want to provide more feedback on the initial Flux integration
| @@ -0,0 +1,434 @@ | |||
| /* | |||
There was a problem hiding this comment.
We also need to add:
- Integration tests: https://github.com/converged-computing/trainer/blob/57805da503042c765bf2a422146bd050067b33f9/test/integration/controller/trainjob_controller_test.go#L925
- E2E tests: https://github.com/converged-computing/trainer/blob/57805da503042c765bf2a422146bd050067b33f9/test/e2e/e2e_test.go#L108
@vsoch Please can you create tracking issue to add this as a followup.
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
@andreyvelich can you give feedback on getting resources - the trainJob container does not specify them, e.g., no Resources here: var gpusPerNode int64 = 0
trainerContainer := info.FindContainerByPodSetAncestorContainerName(constants.AncestorTrainer, constants.Node)
if trainerContainer != nil {
if val, ok := trainerContainer.Resources.Limits["nvidia.com/gpu"]; ok {
gpusPerNode = val.Value()
} else if val, ok := trainerContainer.Resources.Requests["nvidia.com/gpu"]; ok {
// Fallback to requests if limits aren't set
gpusPerNode = val.Value()
}
} |
Just extract them from the Runtime first. Actually, just re-use the same helper functions we run in the Torch plugin: resourcesPerNode := ptr.Deref(runtime.ExtractResourcePerNodeFromRuntime(info), corev1.ResourceRequirements{})
if jobTrainer := trainJob.Spec.Trainer; jobTrainer != nil && jobTrainer.ResourcesPerNode != nil {
resourcesPerNode = ptr.Deref(jobTrainer.ResourcesPerNode, corev1.ResourceRequirements{})
}
gpuQ := runtime.GetNumGPUPerNode(&resourcesPerNode) |
Flux supports the majority of MPI flavors/variants, and can be used to bootstrap MPI as a plugin. It adds other features for scheduling and topology that can be used for simulations and ai/ml jobs. This changeset adds the plugin implementation, including the plugin module, tests, and an example with a small README to serve as documentation for the time being. Signed-off-by: vsoch <vsoch@users.noreply.github.com>
We still need to put the original command in an annotation to retieve later, but others can be re-derived from the environment Signed-off-by: vsoch <vsoch@users.noreply.github.com>
Signed-off-by: vsoch <vsoch@users.noreply.github.com>
Signed-off-by: vsoch <vsoch@users.noreply.github.com>
Signed-off-by: vsoch <vsoch@users.noreply.github.com>
Signed-off-by: vsoch <vsoch@users.noreply.github.com>
Signed-off-by: vsoch <vsoch@users.noreply.github.com>
bd83d71 to
16c4162
Compare
|
@andreyvelich the test is failing because of a nil pointer reference in the function you suggested. I tried all the logical fixes for: nodeResources := runtime.ExtractResourcePerNodeFromRuntime(info)and it always errors with this check: rJob.Template.Labels[constants.LabelTrainJobAncestor] == constants.AncestorTrainer I can check up to that level for nil, still errors. I suspect this is something weird about the apply, so going to need to ask for another set of eyes on it. |
|
@vsoch Your unit test has incorrect PodSet object here: https://github.com/converged-computing/trainer/blob/16c4162ea90298888927705b462fa27fc26a9ebc/pkg/runtime/framework/plugins/flux/flux_test.go#L93-L96 It should be like this, and you should remove {
Name: constants.Node,
Ancestor: ptr.To(constants.AncestorTrainer),
Count: ptr.To[int32](1),
},Check example here: https://github.com/converged-computing/trainer/blob/16c4162ea90298888927705b462fa27fc26a9ebc/pkg/runtime/framework/plugins/mpi/mpi_test.go#L103 We should refactor this unit tests to align with other plugins as part of: #3179 |
|
@andreyvelich I have my test fixed locally, but the larger build is failing with the update from the rebase: |
This is an update to #3064 to include (what comes down to) a rebase. I did a fresh clone and re-applied the changes. Please see the description and review there!