chore(spark): migrate SDK to kubeflow_spark_api Pydantic models#295
chore(spark): migrate SDK to kubeflow_spark_api Pydantic models#295tariq-hasan wants to merge 5 commits intokubeflow:mainfrom
Conversation
|
🎉 Welcome to the Kubeflow SDK! 🎉 Thanks for opening your first PR! We're happy to have you as part of our community 🚀 Here's what happens next:
Join the community:
Feel free to ask questions in the comments if you need any help or clarification! |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
Pull Request Test Coverage Report for Build 22043884336Details
💛 - Coveralls |
There was a problem hiding this comment.
Pull request overview
This PR refactors the Spark SDK to use typed Pydantic models from kubeflow_spark_api instead of raw dictionaries for CRD construction. This aligns the Spark SDK with the established architecture pattern used by the Trainer SDK.
Changes:
- Added
kubeflow-spark-api>=2.3.0dependency and migrated from dict-based CRD construction to typed Pydantic models - Updated all option implementations to work with Pydantic models instead of dictionaries
- Refactored backend methods to convert to/from Pydantic models at the Kubernetes API boundary
Reviewed changes
Copilot reviewed 8 out of 9 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| pyproject.toml | Added kubeflow-spark-api>=2.3.0 dependency |
| uv.lock | Lock file updates for new kubeflow-spark-api dependency |
| kubeflow/spark/backends/kubernetes/backend.py | Convert between dict and Pydantic models at API boundary using .to_dict() and .from_dict() |
| kubeflow/spark/backends/kubernetes/utils.py | Refactored build_spark_connect_crd to return Pydantic model; renamed parse_spark_connect_status to get_spark_connect_info_from_cr with Pydantic input |
| kubeflow/spark/types/options.py | Updated all option callables to accept SparkConnect Pydantic model instead of dict |
| kubeflow/spark/backends/kubernetes/backend_test.py | Enhanced mock responses to include all required fields for Pydantic model validation |
| kubeflow/spark/backends/kubernetes/utils_test.py | Updated tests to work with Pydantic models and added validation test for invalid CR |
| kubeflow/spark/types/options_test.py | Migrated tests to use spark_connect_model fixture and verify Pydantic model attributes |
| hack/Dockerfile.spark-e2e-runner | Added --pre flag to allow installation of pre-release versions |
| role_spec.template = models.IoK8sApiCoreV1PodTemplateSpec() | ||
|
|
||
| # Convert existing template to dict, merge, and convert back | ||
| existing_dict = role_spec.template.to_dict() if role_spec.template else {} |
There was a problem hiding this comment.
Redundant None check in ternary expression. Since role_spec.template is guaranteed to be non-None after line 193, the ternary expression role_spec.template.to_dict() if role_spec.template else {} can be simplified to role_spec.template.to_dict().
| existing_dict = role_spec.template.to_dict() if role_spec.template else {} | |
| existing_dict = role_spec.template.to_dict() |
2df7db9 to
255b2ad
Compare
|
/assign @Shekharrajak |
andreyvelich
left a comment
There was a problem hiding this comment.
Thanks @tariq-hasan!
Overall, looks great.
cc @kubeflow/kubeflow-sdk-team @Fiona-Waters @abhijeet-dhumal @jaiakash
| @@ -9,7 +9,7 @@ COPY pyproject.toml README.md LICENSE ./ | |||
| COPY kubeflow/ kubeflow/ | |||
There was a problem hiding this comment.
Can we remove these filters and run Spark E2E tests for every PR?
https://github.com/tariq-hasan/sdk/blob/255b2ad2e3f953b3aa78deebd4b20a137eb0667c/.github/workflows/test-spark-examples.yaml#L4-L12
It should be fine to run them on every PR, like we do for other tests.
For example, we didn't trigger Spark tests when PySpark dependency is updated: #300
There was a problem hiding this comment.
Sounds good. I have removed the paths.
| base_response = { | ||
| "apiVersion": f"{constants.SPARK_CONNECT_GROUP}/{constants.SPARK_CONNECT_VERSION}", | ||
| "kind": constants.SPARK_CONNECT_KIND, | ||
| "spec": { | ||
| "sparkVersion": constants.DEFAULT_SPARK_VERSION, | ||
| "image": constants.DEFAULT_SPARK_IMAGE, | ||
| "server": { | ||
| "cores": constants.DEFAULT_DRIVER_CPU, | ||
| "memory": constants.DEFAULT_DRIVER_MEMORY, | ||
| }, | ||
| "executor": { | ||
| "instances": 2, | ||
| "cores": constants.DEFAULT_EXECUTOR_CPU, | ||
| "memory": constants.DEFAULT_EXECUTOR_MEMORY, | ||
| }, | ||
| }, | ||
| } |
There was a problem hiding this comment.
Can you refactor it to use Spark Models, like we do in Trainer: https://github.com/tariq-hasan/sdk/blob/255b2ad2e3f953b3aa78deebd4b20a137eb0667c/kubeflow/trainer/backends/kubernetes/backend_test.py#L312 ?
There was a problem hiding this comment.
I have introduced the get_spark_connect function to return a typed model in place of the dict-based approach.
| """ | ||
| base_spec = { | ||
| "sparkVersion": constants.DEFAULT_SPARK_VERSION, | ||
| "image": constants.DEFAULT_SPARK_IMAGE, | ||
| "server": { | ||
| "cores": constants.DEFAULT_DRIVER_CPU, | ||
| "memory": constants.DEFAULT_DRIVER_MEMORY, | ||
| }, | ||
| "executor": { | ||
| "instances": 2, | ||
| "cores": constants.DEFAULT_EXECUTOR_CPU, | ||
| "memory": constants.DEFAULT_EXECUTOR_MEMORY, | ||
| }, |
There was a problem hiding this comment.
The get_spark_connect function is used here now as well.
| API ExecutorSpec model. | ||
| """ | ||
| # Determine number of instances | ||
| if executor and executor.num_instances is not None: |
There was a problem hiding this comment.
| if executor and executor.num_instances is not None: | |
| if executor and executor.num_instances: |
There was a problem hiding this comment.
I have updated the code to remove is not None.
| # Determine number of instances | ||
| if executor and executor.num_instances is not None: | ||
| instances = executor.num_instances | ||
| elif num_executors is not None: |
There was a problem hiding this comment.
| elif num_executors is not None: | |
| elif num_executors: |
There was a problem hiding this comment.
I have updated the code here as well to remove is not None.
| @@ -99,8 +184,8 @@ def build_spark_connect_crd( | |||
| executor: Optional[Executor] = None, | |||
There was a problem hiding this comment.
I still think we should remove driver and executor spec from the first released version, and extend it later.
@Shekharrajak Do you have any particular use-case when users want to set it?
| executor: Optional[Executor] = None, |
| name=metadata.get("name", ""), | ||
| namespace=metadata.get("namespace", ""), | ||
| name=spark_connect_cr.metadata.name, | ||
| namespace=spark_connect_cr.metadata.namespace or "", |
There was a problem hiding this comment.
namespace cannot be none
| namespace=spark_connect_cr.metadata.namespace or "", | |
| namespace=spark_connect_cr.metadata.namespace, |
There was a problem hiding this comment.
I have made the change.
| crd = spark_connect.to_dict() | ||
| assert crd["spec"]["executor"]["cores"] == 2 | ||
| assert crd["spec"]["executor"]["memory"] == "4g" |
There was a problem hiding this comment.
Can you refactor these tests to just access the object fields, you don't need to run to_dict(), e.g.
assert crd.spec.executor.cores == 2There was a problem hiding this comment.
Sounds good. I have made the change to use the spark_connect typed model directly and removed the to_dict() conversion.
| "pydantic>=2.10.0", | ||
| "kubeflow-trainer-api>=2.0.0", | ||
| "kubeflow-katib-api>=0.19.0", | ||
| "kubeflow-spark-api>=2.3.0", |
There was a problem hiding this comment.
Can you add this to the spark extras, alongside pyspark.
There was a problem hiding this comment.
I have added it there as well.
Signed-off-by: tariq-hasan <mmtariquehsn@gmail.com>
Signed-off-by: tariq-hasan <mmtariquehsn@gmail.com>
Signed-off-by: tariq-hasan <mmtariquehsn@gmail.com>
Signed-off-by: tariq-hasan <mmtariquehsn@gmail.com>
Signed-off-by: tariq-hasan <mmtariquehsn@gmail.com>
255b2ad to
740fbf4
Compare
|
I have rebased the PR as well to account for the changes coming from #288. |
| ) | ||
|
|
||
|
|
||
| def build_spark_connect_crd( |
There was a problem hiding this comment.
| def build_spark_connect_crd( | |
| def build_spark_connect_cr( |
| ) -> dict[str, Any]: | ||
| """Build SparkConnect CRD manifest (KEP-107 compliant). | ||
| ) -> models.SparkV1alpha1SparkConnect: | ||
| """Build SparkConnect CRD using typed API models (KEP-107 compliant). |
There was a problem hiding this comment.
| """Build SparkConnect CRD using typed API models (KEP-107 compliant). | |
| """Build SparkConnect CR using typed API models (KEP-107 compliant). |
|
|
||
| Returns: | ||
| SparkConnect CRD as dictionary. | ||
| SparkConnect CRD as typed Pydantic model. |
There was a problem hiding this comment.
| SparkConnect CRD as typed Pydantic model. | |
| SparkConnect CR as typed Pydantic model. |
What this PR does / why we need it:
This PR migrates the Spark SDK from constructing CRDs using raw dictionaries to using the typed Pydantic models provided by
kubeflow_spark_api. There are no user-facing API changes in this PR.What changed:
kubeflow_spark_api.from_dict()instead of manual extractionDriver,Executor,SparkConnectInfo) unchangedWhy:
Testing:
Tested against
kubeflow_spark_api==2.4.0rc0.Which issue(s) this PR fixes (optional, in
Fixes #<issue number>, #<issue number>, ...format, will close the issue(s) when PR gets merged):Fixes #271
Checklist: